Skip to content

Commit

Permalink
[VL] Fix filter push down on short column (#4221)
Browse files Browse the repository at this point in the history
Currently when there is a filter on short column it doesn't get pushed to Velox and the whole scan is executed in vanilla spark.
Ex:
create table abc (a short, b int) using parquet
select * from abc where a = 1


This PR address this issue so we can have scan now in Velox.
Needed additional changes as there was bug with edge case on short filter like a > -32768 which could not filter the value -32768
(Fixes: #4222)
Prev plan for above query:

VeloxColumnarToRowExec
+- ^(1) FilterExecTransformer (isnotnull(a#120) AND (a#120 = 1)), false
   +- RowToVeloxColumnar
      +- *(1) ColumnarToRow
         +- FileScan parquet default.abc[a#120,b#121] Batched: true, DataFilters: [isnotnull(a#120), (a#120 = 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/kapil/dev/src/Gluten/gluten-ut/spark33/target/scala-2.12/te..., PartitionFilters: [], PushedFilters: [IsNotNull(a), EqualTo(a,1)], ReadSchema: struct<a:smallint,b:int>
New Plan

VeloxColumnarToRowExec
+- ^(1) FilterExecTransformer (isnotnull(a#120) AND (a#120 = 1)), false
   +- ^(1) NativeFileNativeScan parquet default.abc[a#120,b#121] Batched: true, DataFilters: [isnotnull(a#120), (a#120 = 1)],
  • Loading branch information
mskapilks authored Jan 23, 2024
1 parent 6b849a5 commit 7562301
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,54 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
}
}

test("Support short int type filter in scan") {
withTable("short_table") {
sql("create table short_table (a short, b int) using parquet")
sql(
s"insert into short_table values " +
s"(1, 1), (null, 2), (${Short.MinValue}, 3), (${Short.MaxValue}, 4)")
runQueryAndCompare("select * from short_table where a = 1") {
checkOperatorMatch[FileSourceScanExecTransformer]
}

runQueryAndCompare("select * from short_table where a is NULL") {
checkOperatorMatch[FileSourceScanExecTransformer]
}

runQueryAndCompare(s"select * from short_table where a != ${Short.MinValue}") {
checkOperatorMatch[FileSourceScanExecTransformer]
}

runQueryAndCompare(s"select * from short_table where a != ${Short.MaxValue}") {
checkOperatorMatch[FileSourceScanExecTransformer]
}
}
}

test("Support int type filter in scan") {
withTable("int_table") {
sql("create table int_table (a int, b int) using parquet")
sql(
s"insert into int_table values " +
s"(1, 1), (null, 2), (${Int.MinValue}, 3), (${Int.MaxValue}, 4)")
runQueryAndCompare("select * from int_table where a = 1") {
checkOperatorMatch[FileSourceScanExecTransformer]
}

runQueryAndCompare("select * from int_table where a is NULL") {
checkOperatorMatch[FileSourceScanExecTransformer]
}

runQueryAndCompare(s"select * from int_table where a != ${Int.MinValue}") {
checkOperatorMatch[FileSourceScanExecTransformer]
}

runQueryAndCompare(s"select * from int_table where a != ${Int.MaxValue}") {
checkOperatorMatch[FileSourceScanExecTransformer]
}
}
}

test("test cross join with equi join conditions") {
withTable("t1", "t2") {
sql("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ class VeloxOrcDataTypeValidationSuite extends VeloxWholeStageTransformerSuite {
}

test("Short type") {
// Validation: BatchScan with Filter
runQueryAndCompare(
"select type1.short, int from type1" +
" where type1.short = 1",
false) {
checkOperatorMatch[BatchScanExecTransformer]
}

// Validation: BatchScan Project Aggregate Expand Sort Limit
runQueryAndCompare(
"select int, short from type1 " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit
}

test("Short type") {
// Validation: BatchScan with Filter
runQueryAndCompare(
"select type1.short, int from type1" +
" where type1.short = 1",
false) {
checkOperatorMatch[BatchScanExecTransformer]
}

// Validation: BatchScan Project Aggregate Expand Sort Limit
runQueryAndCompare(
"select int, short from type1 " +
Expand Down
36 changes: 26 additions & 10 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ void SubstraitToVeloxPlanConverter::setFilterInfo(
break;
case TypeKind::SMALLINT:
if (substraitLit) {
val = variant(substraitLit.value().i16());
val = variant(static_cast<int16_t>(substraitLit.value().i16()));
}
break;
case TypeKind::INTEGER:
Expand Down Expand Up @@ -1850,8 +1850,10 @@ void SubstraitToVeloxPlanConverter::createNotEqualFilter(
// Value > lower
std::unique_ptr<FilterType> lowerFilter;
if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
lowerFilter = std::make_unique<common::BigintRange>(
notVariant.value<NativeType>() + 1 /*lower*/, getMax<NativeType>() /*upper*/, nullAllowed);
if (notVariant.value<NativeType>() < getMax<NativeType>()) {
lowerFilter = std::make_unique<common::BigintRange>(
notVariant.value<NativeType>() + 1 /*lower*/, getMax<NativeType>() /*upper*/, nullAllowed);
}
} else {
lowerFilter = std::make_unique<RangeType>(
notVariant.value<NativeType>() /*lower*/,
Expand All @@ -1866,8 +1868,10 @@ void SubstraitToVeloxPlanConverter::createNotEqualFilter(
// Value < upper
std::unique_ptr<FilterType> upperFilter;
if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
upperFilter = std::make_unique<common::BigintRange>(
getLowest<NativeType>() /*lower*/, notVariant.value<NativeType>() - 1 /*upper*/, nullAllowed);
if (getLowest<NativeType>() < notVariant.value<NativeType>()) {
upperFilter = std::make_unique<common::BigintRange>(
getLowest<NativeType>() /*lower*/, notVariant.value<NativeType>() - 1 /*upper*/, nullAllowed);
}
} else {
upperFilter = std::make_unique<RangeType>(
getLowest<NativeType>() /*lower*/,
Expand All @@ -1881,8 +1885,12 @@ void SubstraitToVeloxPlanConverter::createNotEqualFilter(

// To avoid overlap of BigintMultiRange, keep this appending order to make sure lower bound of one range is less than
// the upper bounds of others.
colFilters.emplace_back(std::move(upperFilter));
colFilters.emplace_back(std::move(lowerFilter));
if (upperFilter) {
colFilters.emplace_back(std::move(upperFilter));
}
if (lowerFilter) {
colFilters.emplace_back(std::move(lowerFilter));
}
}

template <TypeKind KIND>
Expand Down Expand Up @@ -2080,10 +2088,18 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
// due to multirange is in 'OR' relation but 'AND' is needed.
VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after not-equal filter.");
if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
filters[common::Subfield(inputName)] =
std::make_unique<common::MultiRange>(std::move(colFilters), nullAllowed, true /*nanAllowed*/);
if (colFilters.size() == 1) {
filters[common::Subfield(inputName)] = std::move(colFilters.front());
} else {
filters[common::Subfield(inputName)] =
std::make_unique<common::MultiRange>(std::move(colFilters), nullAllowed, true /*nanAllowed*/);
}
} else {
filters[common::Subfield(inputName)] = std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
if (colFilters.size() == 1) {
filters[common::Subfield(inputName)] = std::move(colFilters.front());
} else {
filters[common::Subfield(inputName)] = std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
}
}
return;
}
Expand Down

0 comments on commit 7562301

Please sign in to comment.