Skip to content

Commit

Permalink
[HUDI-8849] Reenable TestExpressionIndex (apache#12624)
Browse files Browse the repository at this point in the history
  • Loading branch information
lokeshj1703 authored Jan 13, 2025
1 parent 73f4670 commit 96ab46b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExpressionIndexSupport(spark: SparkSession,
val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
val expressionIndexRecords = loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory)
loadTransposed(queryReferencedColumns, readInMemory, expressionIndexRecords, expressionIndexQuery) {
transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true))
transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true, Option.apply(indexDefinition)))
}
} else if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
val prunedPartitionAndFileNames = getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, includeLogFiles = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.keygen.KeyGenUtils
import org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR
Expand Down Expand Up @@ -102,8 +102,13 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
(prunedPartitions, prunedFiles)
}

protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex: Boolean = false): Set[String] = {
val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String],
isExpressionIndex: Boolean = false, indexDefinitionOpt: Option[HoodieIndexDefinition] = Option.empty): Set[String] = {
val indexedCols : Seq[String] = if (indexDefinitionOpt.isDefined) {
indexDefinitionOpt.get.getSourceFields.asScala.toSeq
} else {
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
}
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, indexedCols)).reduce(And)
if (indexFilter.equals(TrueLiteral)) {
// if there are any non indexed cols or we can't translate source expr, we have to read all files and may not benefit from col stats lookup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,14 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SaveMode, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.scalatest.Ignore

import java.util.stream.Collectors
import scala.collection.JavaConverters

@Ignore
class TestExpressionIndex extends HoodieSparkSqlTestBase {

override protected def beforeAll(): Unit = {
spark.sql("set hoodie.metadata.index.column.stats.enable=false")
initQueryIndexConf()
}

Expand Down Expand Up @@ -777,6 +776,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -856,6 +856,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -1471,6 +1472,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -1618,6 +1620,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
if (HoodieSparkUtils.gteqSpark3_4) {
Expand Down Expand Up @@ -1942,7 +1945,9 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
if (shouldRollback) {
// rollback the operation
val lastCompletedInstant = metaClient.reloadActiveTimeline().getCommitsTimeline.filterCompletedInstants().lastInstant()
val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), getWriteConfig(Map.empty, metaClient.getBasePath.toString))
val writeConfig = getWriteConfig(Map.empty, metaClient.getBasePath.toString)
writeConfig.setValue("hoodie.metadata.index.column.stats.enable", "false")
val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), writeConfig)
writeClient.rollback(lastCompletedInstant.get().requestedTime)
// validate the expression index
checkAnswer(metadataSql)(
Expand All @@ -1968,7 +1973,8 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
HoodieWriteConfig.TBL_NAME.key -> tableName,
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> "c8"
PARTITIONPATH_FIELD.key() -> "c8",
"hoodie.metadata.index.column.stats.enable" -> "false"
)
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString

Expand Down Expand Up @@ -2055,7 +2061,8 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> "c8",
// setting IndexType to be INMEMORY to simulate Global Index nature
HoodieIndexConfig.INDEX_TYPE.key -> HoodieIndex.IndexType.INMEMORY.name()
HoodieIndexConfig.INDEX_TYPE.key -> HoodieIndex.IndexType.INMEMORY.name(),
"hoodie.metadata.index.column.stats.enable" -> "false"
)
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString

Expand Down

0 comments on commit 96ab46b

Please sign in to comment.