Skip to content

Commit

Permalink
[GLUTEN-5965][VL] Support the pushdown "NOT IN" filter (#5966)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangGuangxin authored Jun 14, 2024
1 parent f1bb1d6 commit c4566eb
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,71 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
checkLengthAndPlan(df, 60141)
}

test("not in") {
// integral type
val df = runQueryAndCompare(
"select l_orderkey from lineitem " +
"where l_partkey not in (1552, 674, 1062)") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}
checkLengthAndPlan(df, 60053)

val df2 = runQueryAndCompare(
"select l_orderkey from lineitem " +
"where l_partkey not in (1552, 674) and l_partkey not in (1062)") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}
checkLengthAndPlan(df2, 60053)

val df3 = runQueryAndCompare(
"select l_orderkey from lineitem " +
"where l_partkey not in (1552, 674) and l_partkey != 1062") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}
checkLengthAndPlan(df3, 60053)

// string type
val df4 =
runQueryAndCompare("select o_orderstatus from orders where o_orderstatus not in ('O', 'F')") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}
checkLengthAndPlan(df4, 363)

// bool type
withTable("t") {
sql("create table t (id int, b boolean) using parquet")
sql("insert into t values (1, true), (2, false), (3, null)")
runQueryAndCompare("select * from t where b not in (true)") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}

runQueryAndCompare("select * from t where b not in (true, false)") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}
}

// mix not-in with range
runQueryAndCompare(
"select l_orderkey from lineitem " +
"where l_partkey not in (1552, 674) and l_partkey >= 1552") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}

// mix not-in with in
runQueryAndCompare(
"select l_orderkey from lineitem " +
"where l_partkey not in (1552, 674) and l_partkey in (1552)") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}

// not-in with or relation
runQueryAndCompare(
"select l_orderkey from lineitem " +
"where l_partkey not in (1552, 674) or l_partkey in (1552)") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}
}

test("coalesce") {
var df = runQueryAndCompare(
"select l_orderkey, coalesce(l_comment, 'default_val') " +
Expand Down
110 changes: 84 additions & 26 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "VariantToVectorConverter.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Filter.h"
#include "velox/type/Type.h"

#include "utils/ConfigExtractor.h"
Expand Down Expand Up @@ -1465,10 +1466,12 @@ connector::hive::SubfieldFilters SubstraitToVeloxPlanConverter::createSubfieldFi
auto expr = scalarFunction.arguments()[0].value();
if (expr.has_scalar_function()) {
// Set its child to filter info with reverse enabled.
setFilterInfo(scalarFunction.arguments()[0].value().scalar_function(), inputTypeList, columnToFilterInfo, true);
setFilterInfo(expr.scalar_function(), inputTypeList, columnToFilterInfo, true);
} else if (expr.has_singular_or_list()) {
auto singularOrList = expr.singular_or_list();
setFilterInfo(singularOrList, columnToFilterInfo, true);
} else {
// TODO: support push down of Not In.
VELOX_NYI("Scalar function expected.");
VELOX_NYI("Only support push down Not with scalar function or In.");
}
} else if (filterName == sOr) {
VELOX_CHECK(scalarFunction.arguments().size() == 2);
Expand Down Expand Up @@ -1593,24 +1596,26 @@ bool SubstraitToVeloxPlanConverter::canPushdownNot(
std::vector<RangeRecorder>& rangeRecorders) {
VELOX_CHECK(scalarFunction.arguments().size() == 1, "Only one arg is expected for Not.");
const auto& notArg = scalarFunction.arguments()[0];
if (!notArg.value().has_scalar_function()) {
// Not for a Boolean Literal or Or List is not supported curretly.
// It can be pushed down with an AlwaysTrue or AlwaysFalse Range.
return false;
}

auto argFunction =
SubstraitParser::findFunctionSpec(functionMap_, notArg.value().scalar_function().function_reference());
auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);
if (notArg.value().has_singular_or_list()) {
auto singularOrList = notArg.value().singular_or_list();
if (!canPushdownSingularOrList(singularOrList)) {
return false;
}
uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
return rangeRecorders.at(colIdx).setInRange();
} else if (notArg.value().has_scalar_function()) {
auto argFunction =
SubstraitParser::findFunctionSpec(functionMap_, notArg.value().scalar_function().function_reference());
auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);

static const std::unordered_set<std::string> supportedNotFunctions = {sGte, sGt, sLte, sLt, sEqual};
static const std::unordered_set<std::string> supportedNotFunctions = {sGte, sGt, sLte, sLt, sEqual};

uint32_t fieldIdx;
bool isFieldOrWithLiteral = fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);
uint32_t fieldIdx;
bool isFieldOrWithLiteral = fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);

if (supportedNotFunctions.find(functionName) != supportedNotFunctions.end() && isFieldOrWithLiteral &&
rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName, true /*reverse*/)) {
return true;
return (
supportedNotFunctions.find(functionName) != supportedNotFunctions.end() && isFieldOrWithLiteral &&
rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName, true /*reverse*/));
}
return false;
}
Expand Down Expand Up @@ -1966,13 +1971,15 @@ template <TypeKind KIND>
void SubstraitToVeloxPlanConverter::setInFilter(
const std::vector<variant>& variants,
bool nullAllowed,
bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
const std::vector<variant>& variants,
bool nullAllowed,
bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
std::vector<int64_t> values;
Expand All @@ -1981,13 +1988,18 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
int64_t value = variant.value<int64_t>();
values.emplace_back(value);
}
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
const std::vector<variant>& variants,
bool nullAllowed,
bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
// Use bigint values for int type.
Expand All @@ -1998,13 +2010,18 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
int64_t value = variant.value<int32_t>();
values.emplace_back(value);
}
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
const std::vector<variant>& variants,
bool nullAllowed,
bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
// Use bigint values for small int type.
Expand All @@ -2015,13 +2032,18 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
int64_t value = variant.value<int16_t>();
values.emplace_back(value);
}
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
const std::vector<variant>& variants,
bool nullAllowed,
bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
// Use bigint values for tiny int type.
Expand All @@ -2032,13 +2054,18 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
int64_t value = variant.value<int8_t>();
values.emplace_back(value);
}
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
const std::vector<variant>& variants,
bool nullAllowed,
bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters) {
std::vector<std::string> values;
Expand All @@ -2047,7 +2074,11 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
std::string value = variant.value<std::string>();
values.emplace_back(value);
}
filters[common::Subfield(inputName)] = std::make_unique<common::BytesValues>(values, nullAllowed);
if (negated) {
filters[common::Subfield(inputName)] = std::make_unique<common::NegatedBytesValues>(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = std::make_unique<common::BytesValues>(values, nullAllowed);
}
}

template <TypeKind KIND, typename FilterType>
Expand Down Expand Up @@ -2102,6 +2133,17 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
if (filterInfo.notValue_) {
filters[common::Subfield(inputName)] =
std::make_unique<common::BoolValue>(!filterInfo.notValue_.value().value<bool>(), nullAllowed);
} else if (filterInfo.notValues_.size() > 0) {
std::set<bool> notValues;
for (auto v : filterInfo.notValues_) {
notValues.emplace(v.value<bool>());
}
if (notValues.size() == 1) {
filters[common::Subfield(inputName)] = std::make_unique<common::BoolValue>(!(*notValues.begin()), nullAllowed);
} else {
// if there are more than one distinct value in NOT IN list, the filter should be AlwaysFalse
filters[common::Subfield(inputName)] = std::make_unique<common::AlwaysFalse>();
}
} else if (rangeSize == 0) {
// IsNull/IsNotNull.
if (!nullAllowed) {
Expand Down Expand Up @@ -2140,11 +2182,22 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
if (filterInfo.values_.size() > 0) {
// To filter out null is a default behaviour of Spark IN expression.
nullAllowed = false;
setInFilter<KIND>(filterInfo.values_, nullAllowed, inputName, filters);
setInFilter<KIND>(filterInfo.values_, nullAllowed, false, inputName, filters);
// Currently, In cannot coexist with other filter conditions
// due to multirange is in 'OR' relation but 'AND' is needed.
VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after IN filter.");
VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be supported after IN filter.");
VELOX_CHECK(filterInfo.notValues_.size() == 0, "Not in cannot be supported after IN filter.");
return;
}

// Handle not in filter.
if (filterInfo.notValues_.size() > 0) {
setInFilter<KIND>(filterInfo.notValues_, filterInfo.nullAllowed_, true, inputName, filters);
// Currently, NOT In cannot coexist with other filter conditions
// due to multirange is in 'OR' relation but 'AND' is needed.
VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after NOT IN filter.");
VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be supported after NOT IN filter.");
return;
}

Expand Down Expand Up @@ -2429,7 +2482,8 @@ uint32_t SubstraitToVeloxPlanConverter::getColumnIndexFromSingularOrList(

void SubstraitToVeloxPlanConverter::setFilterInfo(
const ::substrait::Expression_SingularOrList& singularOrList,
std::vector<FilterInfo>& columnToFilterInfo) {
std::vector<FilterInfo>& columnToFilterInfo,
bool reverse) {
VELOX_CHECK(singularOrList.options_size() > 0, "At least one option is expected.");
// Get the column index.
uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
Expand All @@ -2443,7 +2497,11 @@ void SubstraitToVeloxPlanConverter::setFilterInfo(
variants.emplace_back(exprConverter_->toVeloxExpr(option.literal())->value());
}
// Set the value list to filter info.
columnToFilterInfo[colIdx].setValues(variants);
if (!reverse) {
columnToFilterInfo[colIdx].setValues(variants);
} else {
columnToFilterInfo[colIdx].setNotValues(variants);
}
}

} // namespace gluten
23 changes: 20 additions & 3 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,16 @@ class SubstraitToVeloxPlanConverter {
}
}

// Set a list of values to be used in the push down of 'not in' expression.
void setNotValues(const std::vector<variant>& notValues) {
for (const auto& value : notValues) {
notValues_.emplace_back(value);
}
if (!initialized_) {
initialized_ = true;
}
}

// Whether this filter map is initialized.
bool initialized_ = false;

Expand All @@ -402,6 +412,9 @@ class SubstraitToVeloxPlanConverter {

// The list of values used in 'in' expression.
std::vector<variant> values_;

// The list of values should not be equal to.
std::vector<variant> notValues_;
};

/// Returns unique ID to use for plan node. Produces sequential numbers
Expand Down Expand Up @@ -464,9 +477,11 @@ class SubstraitToVeloxPlanConverter {
bool reverse = false);

/// Extract SingularOrList and set it to the filter info map.
/// If reverse is true, the opposite filter info will be set.
void setFilterInfo(
const ::substrait::Expression_SingularOrList& singularOrList,
std::vector<FilterInfo>& columnToFilterInfo);
std::vector<FilterInfo>& columnToFilterInfo,
bool reverse = false);

/// Extract SingularOrList and returns the field index.
static uint32_t getColumnIndexFromSingularOrList(const ::substrait::Expression_SingularOrList&);
Expand All @@ -484,13 +499,15 @@ class SubstraitToVeloxPlanConverter {
template <TypeKind KIND, typename FilterType>
void createNotEqualFilter(variant notVariant, bool nullAllowed, std::vector<std::unique_ptr<FilterType>>& colFilters);

/// Create a values range to handle in filter.
/// variants: the list of values extracted from the in expression.
/// Create a values range to handle (not) in filter.
/// variants: the list of values extracted from the (not) in expression.
// negated: false for IN filter, true for NOT IN filter.
/// inputName: the column input name.
template <TypeKind KIND>
void setInFilter(
const std::vector<variant>& variants,
bool nullAllowed,
bool negated,
const std::string& inputName,
connector::hive::SubfieldFilters& filters);

Expand Down

0 comments on commit c4566eb

Please sign in to comment.