From 75623013713cccd195fa4c9efe0e05a90e4c99af Mon Sep 17 00:00:00 2001 From: Kapil Kumar Singh <106726387+mskapilks@users.noreply.github.com> Date: Tue, 23 Jan 2024 06:35:33 +0530 Subject: [PATCH] [VL] Fix filter push down on short column (#4221) Currently when there is a filter on short column it doesn't get pushed to Velox and the whole scan is executed in vanilla spark. Ex: create table abc (a short, b int) using parquet select * from abc where a = 1 This PR address this issue so we can have scan now in Velox. Needed additional changes as there was bug with edge case on short filter like a > -32768 which could not filter the value -32768 (Fixes: #4222) Prev plan for above query: VeloxColumnarToRowExec +- ^(1) FilterExecTransformer (isnotnull(a#120) AND (a#120 = 1)), false +- RowToVeloxColumnar +- *(1) ColumnarToRow +- FileScan parquet default.abc[a#120,b#121] Batched: true, DataFilters: [isnotnull(a#120), (a#120 = 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/kapil/dev/src/Gluten/gluten-ut/spark33/target/scala-2.12/te..., PartitionFilters: [], PushedFilters: [IsNotNull(a), EqualTo(a,1)], ReadSchema: struct New Plan VeloxColumnarToRowExec +- ^(1) FilterExecTransformer (isnotnull(a#120) AND (a#120 = 1)), false +- ^(1) NativeFileNativeScan parquet default.abc[a#120,b#121] Batched: true, DataFilters: [isnotnull(a#120), (a#120 = 1)], --- .../execution/TestOperator.scala | 48 +++++++++++++++++++ .../VeloxOrcDataTypeValidationSuite.scala | 8 ++++ .../VeloxParquetDataTypeValidationSuite.scala | 8 ++++ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 36 ++++++++++---- 4 files changed, 90 insertions(+), 10 deletions(-) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala index fc59b2f8de62..c0c82bfdbe77 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala @@ -734,6 +734,54 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla } } + test("Support short int type filter in scan") { + withTable("short_table") { + sql("create table short_table (a short, b int) using parquet") + sql( + s"insert into short_table values " + + s"(1, 1), (null, 2), (${Short.MinValue}, 3), (${Short.MaxValue}, 4)") + runQueryAndCompare("select * from short_table where a = 1") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + + runQueryAndCompare("select * from short_table where a is NULL") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + + runQueryAndCompare(s"select * from short_table where a != ${Short.MinValue}") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + + runQueryAndCompare(s"select * from short_table where a != ${Short.MaxValue}") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + } + } + + test("Support int type filter in scan") { + withTable("int_table") { + sql("create table int_table (a int, b int) using parquet") + sql( + s"insert into int_table values " + + s"(1, 1), (null, 2), (${Int.MinValue}, 3), (${Int.MaxValue}, 4)") + runQueryAndCompare("select * from int_table where a = 1") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + + runQueryAndCompare("select * from int_table where a is NULL") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + + runQueryAndCompare(s"select * from int_table where a != ${Int.MinValue}") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + + runQueryAndCompare(s"select * from int_table where a != ${Int.MaxValue}") { + checkOperatorMatch[FileSourceScanExecTransformer] + } + } + } + test("test cross join with equi join conditions") { withTable("t1", "t2") { sql(""" diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxOrcDataTypeValidationSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxOrcDataTypeValidationSuite.scala index 4c8a13a51fc5..3317649571cd 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxOrcDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxOrcDataTypeValidationSuite.scala @@ -193,6 +193,14 @@ class VeloxOrcDataTypeValidationSuite extends VeloxWholeStageTransformerSuite { } test("Short type") { + // Validation: BatchScan with Filter + runQueryAndCompare( + "select type1.short, int from type1" + + " where type1.short = 1", + false) { + checkOperatorMatch[BatchScanExecTransformer] + } + // Validation: BatchScan Project Aggregate Expand Sort Limit runQueryAndCompare( "select int, short from type1 " + diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala index 8cd5bcdb8cc8..4b954975b206 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxParquetDataTypeValidationSuite.scala @@ -192,6 +192,14 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit } test("Short type") { + // Validation: BatchScan with Filter + runQueryAndCompare( + "select type1.short, int from type1" + + " where type1.short = 1", + false) { + checkOperatorMatch[BatchScanExecTransformer] + } + // Validation: BatchScan Project Aggregate Expand Sort Limit runQueryAndCompare( "select int, short from type1 " + diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index aeb575cd431a..68e612c1feac 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1776,7 +1776,7 @@ void SubstraitToVeloxPlanConverter::setFilterInfo( break; case TypeKind::SMALLINT: if (substraitLit) { - val = variant(substraitLit.value().i16()); + val = variant(static_cast(substraitLit.value().i16())); } break; case TypeKind::INTEGER: @@ -1850,8 +1850,10 @@ void SubstraitToVeloxPlanConverter::createNotEqualFilter( // Value > lower std::unique_ptr lowerFilter; if constexpr (std::is_same_v) { - lowerFilter = std::make_unique( - notVariant.value() + 1 /*lower*/, getMax() /*upper*/, nullAllowed); + if (notVariant.value() < getMax()) { + lowerFilter = std::make_unique( + notVariant.value() + 1 /*lower*/, getMax() /*upper*/, nullAllowed); + } } else { lowerFilter = std::make_unique( notVariant.value() /*lower*/, @@ -1866,8 +1868,10 @@ void SubstraitToVeloxPlanConverter::createNotEqualFilter( // Value < upper std::unique_ptr upperFilter; if constexpr (std::is_same_v) { - upperFilter = std::make_unique( - getLowest() /*lower*/, notVariant.value() - 1 /*upper*/, nullAllowed); + if (getLowest() < notVariant.value()) { + upperFilter = std::make_unique( + getLowest() /*lower*/, notVariant.value() - 1 /*upper*/, nullAllowed); + } } else { upperFilter = std::make_unique( getLowest() /*lower*/, @@ -1881,8 +1885,12 @@ void SubstraitToVeloxPlanConverter::createNotEqualFilter( // To avoid overlap of BigintMultiRange, keep this appending order to make sure lower bound of one range is less than // the upper bounds of others. - colFilters.emplace_back(std::move(upperFilter)); - colFilters.emplace_back(std::move(lowerFilter)); + if (upperFilter) { + colFilters.emplace_back(std::move(upperFilter)); + } + if (lowerFilter) { + colFilters.emplace_back(std::move(lowerFilter)); + } } template @@ -2080,10 +2088,18 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters( // due to multirange is in 'OR' relation but 'AND' is needed. VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after not-equal filter."); if constexpr (std::is_same_v) { - filters[common::Subfield(inputName)] = - std::make_unique(std::move(colFilters), nullAllowed, true /*nanAllowed*/); + if (colFilters.size() == 1) { + filters[common::Subfield(inputName)] = std::move(colFilters.front()); + } else { + filters[common::Subfield(inputName)] = + std::make_unique(std::move(colFilters), nullAllowed, true /*nanAllowed*/); + } } else { - filters[common::Subfield(inputName)] = std::make_unique(std::move(colFilters), nullAllowed); + if (colFilters.size() == 1) { + filters[common::Subfield(inputName)] = std::move(colFilters.front()); + } else { + filters[common::Subfield(inputName)] = std::make_unique(std::move(colFilters), nullAllowed); + } } return; }