diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 3d1aea3ffa34..c6bf75f4d251 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -30,7 +30,6 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction - import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -45,8 +44,8 @@ import org.apache.spark.unsafe.types.UTF8String import java.text.SimpleDateFormat import java.util.stream.Collectors import javax.annotation.concurrent.NotThreadSafe - import scala.collection.JavaConverters._ +import scala.util.control.Breaks.{break, breakable} import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -226,8 +225,13 @@ case class HoodieFileIndex(spark: SparkSession, def filterFileSlices(dataFilters: Seq[Expression], partitionFilters: Seq[Expression], isPartitionPruned: Boolean = false) : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = { - val (isPruned, prunedPartitionsAndFileSlices) = + var (isPruned, prunedPartitionsAndFileSlices) = prunePartitionsAndGetFileSlices(dataFilters, partitionFilters) + if (!isPruned && partitionFilters.nonEmpty && isExpressionIndexEnabled) { + // val expressionIndexSupport = getExpressionIndexSupport() + // lazy val queryReferencedColumns = collectReferencedColumns(spark, partitionFilters, schema) + prunedPartitionsAndFileSlices = filterFileSlices(partitionFilters, false, prunedPartitionsAndFileSlices) + } hasPushedDownPartitionPredicates = true // If there are no data filters, return all the file slices. @@ -236,64 +240,59 @@ case class HoodieFileIndex(spark: SparkSession, if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty || isPartitionPruned ) { prunedPartitionsAndFileSlices } else { - // Look up candidate files names in the col-stats or record level index, if all of the following conditions are true - // - Data-skipping is enabled - // - Col-Stats Index is present - // - Record-level Index is present - // - List of predicates (filters) is present - val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, isPruned) match { - case Success(opt) => opt - case Failure(e) => - logError("Failed to lookup candidate files in File Index", e) - - spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match { - case DataSkippingFailureMode.Fallback.value => Option.empty - case DataSkippingFailureMode.Strict.value => throw new HoodieException(e); - } - } + filterFileSlices(dataFilters, isPruned, prunedPartitionsAndFileSlices) + } + } + + private def filterFileSlices(dataFilters: Seq[Expression], isPruned: Boolean, prunedPartitionsAndFileSlices: Seq[(Option[PartitionPath], Seq[FileSlice])]) = { + // Look up candidate files names in the col-stats or record level index, if all of the following conditions are true + // - Data-skipping is enabled + // - Col-Stats Index is present + // - Record-level Index is present + // - List of predicates (filters) is present + val candidateFilesNamesOpt: Option[Set[String]] = + lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, isPruned) - logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}") + logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}") - var totalFileSliceSize = 0 - var candidateFileSliceSize = 0 + var totalFileSliceSize = 0 + var candidateFileSliceSize = 0 - val prunedPartitionsAndFilteredFileSlices = prunedPartitionsAndFileSlices.map { - case (partitionOpt, fileSlices) => - // Filter in candidate files based on the col-stats or record level index lookup - val candidateFileSlices: Seq[FileSlice] = { - fileSlices.filter(fs => { + val prunedPartitionsAndFilteredFileSlices = prunedPartitionsAndFileSlices.map { + case (partitionOpt, fileSlices) => + // Filter in candidate files based on the col-stats or record level index lookup + val candidateFileSlices: Seq[FileSlice] = { + fileSlices.filter(fs => { + if (candidateFilesNamesOpt.isDefined) { + // if any file in the file slice is part of candidate file names, we need to inclue the file slice. + // in other words, if all files in the file slice is not present in candidate file names, we can filter out the file slice. val fileSliceFiles = fs.getLogFiles.map[String](JFunction.toJavaFunction[HoodieLogFile, String](lf => lf.getPath.getName)) .collect(Collectors.toSet[String]) val baseFileStatusOpt = getBaseFileInfo(Option.apply(fs.getBaseFile.orElse(null))) baseFileStatusOpt.exists(f => fileSliceFiles.add(f.getPath.getName)) - if (candidateFilesNamesOpt.isDefined) { - // if any file in the file slice is part of candidate file names, we need to inclue the file slice. - // in other words, if all files in the file slice is not present in candidate file names, we can filter out the file slice. - fileSliceFiles.stream().filter(fileSliceFile => candidateFilesNamesOpt.get.contains(fileSliceFile)).findAny().isPresent - } else { - true - } - }) - } + fileSliceFiles.stream().filter(fileSliceFile => candidateFilesNamesOpt.get.contains(fileSliceFile)).findAny().isPresent + } else { + true + } + }) + } - totalFileSliceSize += fileSlices.size - candidateFileSliceSize += candidateFileSlices.size - (partitionOpt, candidateFileSlices) - } + totalFileSliceSize += fileSlices.size + candidateFileSliceSize += candidateFileSlices.size + (partitionOpt, candidateFileSlices) + } - val skippingRatio = - if (!areAllFileSlicesCached) -1 - else if (getAllFiles().nonEmpty && totalFileSliceSize > 0) - (totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble - else 0 + val skippingRatio = + if (!areAllFileSlicesCached) -1 + else if (getAllFiles().nonEmpty && totalFileSliceSize > 0) + (totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble + else 0 - logInfo(s"Total file slices: $totalFileSliceSize; " + - s"candidate file slices after data skipping: $candidateFileSliceSize; " + - s"skipping percentage $skippingRatio") + logInfo(s"Total file slices: $totalFileSliceSize; " + + s"candidate file slices after data skipping: $candidateFileSliceSize; " + + s"skipping percentage $skippingRatio") - prunedPartitionsAndFilteredFileSlices - } + prunedPartitionsAndFilteredFileSlices } /** @@ -346,6 +345,10 @@ case class HoodieFileIndex(spark: SparkSession, (Option.apply(partition), fileSlices.asScala.map(f => f.withLogFiles(includeLogFiles)).toSeq) }).toSeq) } + private def getExpressionIndexSupport(): ExpressionIndexSupport = { + indicesSupport.collectFirst({ case indexSupport: ExpressionIndexSupport => indexSupport }).get + } + /** * In the fast bootstrap read code path, it gets the path info for the bootstrap base file instead of * skeleton file. Returns path info for the base file if available. @@ -379,31 +382,44 @@ case class HoodieFileIndex(spark: SparkSession, // scalastyle:off return private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression], prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], - shouldPushDownFilesFilter: Boolean): Try[Option[Set[String]]] = Try { - // NOTE: For column stats, Data Skipping is only effective when it references columns that are indexed w/in - // the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping: - // - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since - // CSI only contains stats for top-level columns, in this case for "struct") - // - Any expression not directly referencing top-level column (for ex, sub-queries, since there's - // nothing CSI in particular could be applied for) - // For record index, Data Skipping is only effective when one of the query filter is of type EqualTo - // or IN query on simple record keys. In such a case the record index is used to filter the file slices - // and candidate files are obtained from these file slices. - - lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) - if (isDataSkippingEnabled) { - for(indexSupport: SparkBaseIndexSupport <- indicesSupport) { - if (indexSupport.isIndexAvailable && indexSupport.supportsQueryType(options)) { - val prunedFileNames = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns, - prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) - if (prunedFileNames.nonEmpty) { - return Try(prunedFileNames) + shouldPushDownFilesFilter: Boolean): Option[Set[String]] = { + Try { + // NOTE: For column stats, Data Skipping is only effective when it references columns that are indexed w/in + // the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping: + // - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since + // CSI only contains stats for top-level columns, in this case for "struct") + // - Any expression not directly referencing top-level column (for ex, sub-queries, since there's + // nothing CSI in particular could be applied for) + // For record index, Data Skipping is only effective when one of the query filter is of type EqualTo + // or IN query on simple record keys. In such a case the record index is used to filter the file slices + // and candidate files are obtained from these file slices. + lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) + var prunedFileNamesOpt: Option[Set[String]] = Option.empty + if (isDataSkippingEnabled) { + breakable { + for (indexSupport: SparkBaseIndexSupport <- indicesSupport) { + if (indexSupport.isIndexAvailable && indexSupport.supportsQueryType(options)) { + prunedFileNamesOpt = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns, + prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) + if (prunedFileNamesOpt.nonEmpty) { + break() + } + } } } } + validateConfig() + prunedFileNamesOpt + } match { + case Success(opt) => opt + case Failure(e) => + logError("Failed to lookup candidate files in File Index", e) + + spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match { + case DataSkippingFailureMode.Fallback.value => Option.empty + case DataSkippingFailureMode.Strict.value => throw new HoodieException(e); + } } - validateConfig() - Option.empty } override def refresh(): Unit = {