Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5953][VL] Prevent pushdown filters with unsupported data types to scan node #5954

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
import org.apache.gluten.benchmarks.RandomParquetDataGenerator
import org.apache.gluten.utils.VeloxFileSystemValidationJniWrapper

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.GreaterThan
import org.apache.spark.sql.execution.ScalarSubquery
import org.apache.spark.sql.types._

class VeloxScanSuite extends VeloxWholeStageTransformerSuite {
protected val rootPath: String = getClass.getResource("/").getPath
Expand Down Expand Up @@ -118,4 +120,34 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite {
!VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
Array("file:/test_path/", "unsupported://test_path")))
}

test("unsupported data type scan filter pushdown") {
withTempView("t") {
withTempDir {
dir =>
val path = dir.getAbsolutePath
val schema = StructType(
Array(
StructField("short_decimal_field", DecimalType(5, 2), true),
StructField("long_decimal_field", DecimalType(32, 8), true),
StructField("binary_field", BinaryType, true),
StructField("timestamp_field", TimestampType, true)
))
RandomParquetDataGenerator(0).generateRandomData(spark, schema, 10, Some(path))
spark.catalog.createTable("t", path, "parquet")
runQueryAndCompare(
"""select * from t where long_decimal_field = 3.14""".stripMargin
)(checkGlutenOperatorMatch[FileSourceScanExecTransformer])
runQueryAndCompare(
"""select * from t where short_decimal_field = 3.14""".stripMargin
)(checkGlutenOperatorMatch[FileSourceScanExecTransformer])
runQueryAndCompare(
"""select * from t where binary_field = '3.14'""".stripMargin
)(checkGlutenOperatorMatch[FileSourceScanExecTransformer])
runQueryAndCompare(
"""select * from t where timestamp_field = current_timestamp()""".stripMargin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have made some updates on the timestamp support. Could you help check if the pushdown of timestamp is supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it's already supported in velox side. But as for now, Gluten can still fallback since mapToFilters doesn't support yet. I think we can support it in next PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Would you like to open an issue to track its support? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the issue #6642

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

)(checkGlutenOperatorMatch[FileSourceScanExecTransformer])
}
}
}
}
52 changes: 30 additions & 22 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1600,17 +1600,26 @@ bool SubstraitToVeloxPlanConverter::childrenFunctionsOnSameField(
bool SubstraitToVeloxPlanConverter::canPushdownFunction(
const ::substrait::Expression_ScalarFunction& scalarFunction,
const std::string& filterName,
uint32_t& fieldIdx) {
// Condtions can be pushed down.
uint32_t& fieldIdx,
const std::vector<TypePtr>& veloxTypeList) {
// Conditions can be pushed down.
static const std::unordered_set<std::string> supportedFunctions = {sIsNotNull, sIsNull, sGte, sGt, sLte, sLt, sEqual};

bool canPushdown = false;
if (supportedFunctions.find(filterName) != supportedFunctions.end() &&
fieldOrWithLiteral(scalarFunction.arguments(), fieldIdx)) {
// The arg should be field or field with literal.
canPushdown = true;
if (supportedFunctions.find(filterName) == supportedFunctions.end()) {
return false;
}

// The arg should be field or field with literal.
if (!fieldOrWithLiteral(scalarFunction.arguments(), fieldIdx)) {
return false;
}
return canPushdown;

// Check whether data type is supported or not
if (!veloxTypeList.empty() && fieldIdx < veloxTypeList.size() && !isPushdownSupported(veloxTypeList.at(fieldIdx))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a bug if 'fieldIdx >= veloxTypeList.size()'? Shall we just add a check to ensure it does not happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return false;
}

return true;
}

bool SubstraitToVeloxPlanConverter::canPushdownNot(
Expand Down Expand Up @@ -1686,6 +1695,18 @@ bool SubstraitToVeloxPlanConverter::canPushdownOr(
return true;
}

bool SubstraitToVeloxPlanConverter::isPushdownSupported(TypePtr inputType) {
// Keep the same with mapToFilters
switch (inputType->kind()) {
case TypeKind::TIMESTAMP:
case TypeKind::VARBINARY:
case TypeKind::HUGEINT:
return false;
default:
return true;
}
}

void SubstraitToVeloxPlanConverter::separateFilters(
std::vector<RangeRecorder>& rangeRecorders,
const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
Expand All @@ -1712,19 +1733,6 @@ void SubstraitToVeloxPlanConverter::separateFilters(
for (const auto& scalarFunction : scalarFunctions) {
auto filterNameSpec = SubstraitParser::findFunctionSpec(functionMap_, scalarFunction.function_reference());
auto filterName = SubstraitParser::getNameBeforeDelimiter(filterNameSpec);
// Add all decimal filters to remaining functions because their pushdown are not supported.
if (format == dwio::common::FileFormat::ORC && scalarFunction.arguments().size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like previous logic only excludes decimal type for ORC format, but now we are excluding HUGEINT for all file formats. Could you confirm if it is expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ORC format already supports short-decimal predicate pushdown. @jiangjiangtian Can you help test this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verify that short-decimal predicate can be pushdown, as follows:
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main idea of these three types (Timestamp/VarBinary/HugeInt) in this PR is that if we don't put them in remainingFilter, then when we call mapToFilters, a exception Subfield filters creation not supported for input type '{}' in mapToFilters will throw, which will cause scan fallback.

The reason why mapToFilters don't support these three types is because there is no MultiRangeType or related type traits defined, which has nothing to do with the fileformat. cc @rui-mo @jiangjiangtian @kecookier

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means actually we don't support the pushdown of long decimal in Parquet and ORC. Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means actually we don't support the pushdown of long decimal in Parquet and ORC. Is that correct?

@rui-mo Currently yes. But to support a HugeintMultiRange in velox side is not a big issue, I can work on it later.

auto value = scalarFunction.arguments().at(0).value();
if (value.has_selection()) {
uint32_t fieldIndex;
bool parsed = SubstraitParser::parseReferenceSegment(value.selection().direct_reference(), fieldIndex);
if (!parsed || (!veloxTypeList.empty() && veloxTypeList.at(fieldIndex)->isDecimal())) {
remainingFunctions.emplace_back(scalarFunction);
continue;
}
}
}

// Check whether NOT and OR functions can be pushed down.
// If yes, the scalar function will be added into the subfield functions.
if (filterName == sNot) {
Expand All @@ -1742,7 +1750,7 @@ void SubstraitToVeloxPlanConverter::separateFilters(
} else {
// Check if the condition is supported to be pushed down.
uint32_t fieldIdx;
if (canPushdownFunction(scalarFunction, filterName, fieldIdx) &&
if (canPushdownFunction(scalarFunction, filterName, fieldIdx, veloxTypeList) &&
rangeRecorders.at(fieldIdx).setCertainRangeForFunction(filterName)) {
subfieldFunctions.emplace_back(scalarFunction);
} else {
Expand Down
6 changes: 5 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ class SubstraitToVeloxPlanConverter {
static bool canPushdownFunction(
const ::substrait::Expression_ScalarFunction& scalarFunction,
const std::string& filterName,
uint32_t& fieldIdx);
uint32_t& fieldIdx,
const std::vector<TypePtr>& veloxTypeList);

/// Returns whether a NOT function can be pushed down.
bool canPushdownNot(
Expand All @@ -473,6 +474,9 @@ class SubstraitToVeloxPlanConverter {
/// 'or' expression are effective on the same column.
static bool childrenFunctionsOnSameField(const ::substrait::Expression_ScalarFunction& function);

/// Check whether the data type is supported to pushdown.
static bool isPushdownSupported(TypePtr inputType);

/// Extract the scalar function, and set the filter info for different types
/// of columns. If reverse is true, the opposite filter info will be set.
void setFilterInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.types._

import com.github.javafaker.Faker

import java.sql.Date
import java.sql.{Date, Timestamp}
import java.util.Random

case class RandomParquetDataGenerator(initialSeed: Long = 0L) extends Logging {
Expand Down Expand Up @@ -67,7 +67,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) extends Logging {
case DoubleType =>
faker.number().randomDouble(2, Double.MinValue.toLong, Double.MaxValue.toLong)
case DateType => new Date(faker.date().birthday().getTime)
// case TimestampType => new Timestamp(faker.date().birthday().getTime)
case TimestampType => new Timestamp(faker.date().birthday().getTime)
case t: DecimalType =>
BigDecimal(
faker.number().randomDouble(t.scale, 0, Math.pow(10, t.precision - t.scale).toLong))
Expand Down Expand Up @@ -124,7 +124,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) extends Logging {
() => StructField(fieldName, FloatType, nullable = true),
() => StructField(fieldName, DoubleType, nullable = true),
() => StructField(fieldName, DateType, nullable = true),
// () => StructField(fieldName, TimestampType, nullable = true),
() => StructField(fieldName, TimestampType, nullable = true),
() => StructField(fieldName, DecimalType(10, 2), nullable = true),
() => StructField(fieldName, DecimalType(30, 10), nullable = true)
)
Expand Down
Loading