Skip to content

Commit

Permalink
[VL] Remove self-registered tokenizer (apache#6713)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored and shamirchen committed Oct 14, 2024
1 parent c234ffb commit 2a1a0f1
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 113 deletions.
1 change: 0 additions & 1 deletion cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ set(VELOX_SRCS
memory/VeloxMemoryManager.cc
operators/functions/RegistrationAllFunctions.cc
operators/functions/RowConstructorWithNull.cc
operators/functions/SparkTokenizer.cc
operators/serializer/VeloxColumnarToRowConverter.cc
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
Expand Down
2 changes: 0 additions & 2 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "compute/VeloxRuntime.h"
#include "config/VeloxConfig.h"
#include "jni/JniFileSystem.h"
#include "operators/functions/SparkTokenizer.h"
#include "udf/UdfLoader.h"
#include "utils/exception.h"
#include "velox/common/caching/SsdCache.h"
Expand Down Expand Up @@ -124,7 +123,6 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());

initUdf();
registerSparkTokenizer();

// Initialize the global memory manager for current process.
auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
Expand Down
58 changes: 0 additions & 58 deletions cpp/velox/operators/functions/SparkTokenizer.cc

This file was deleted.

24 changes: 0 additions & 24 deletions cpp/velox/operators/functions/SparkTokenizer.h

This file was deleted.

73 changes: 45 additions & 28 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ RowTypePtr getJoinOutputType(
}
VELOX_FAIL("Output should include left or right columns.");
}

// Returns the field name separators used to create Subfield.
std::shared_ptr<common::Separators> getSeparators() {
auto separators = std::make_shared<common::Separators>();
// ']', '.', '[', '*', '^' are not separators in Spark.
separators->closeBracket = '\0';
separators->dot = '\0';
separators->openBracket = '\0';
separators->wildCard = '\0';
separators->unicodeCaret = '\0';
return separators;
}

} // namespace

core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit(
Expand Down Expand Up @@ -2027,9 +2040,9 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
values.emplace_back(value);
}
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed);
}
}

Expand All @@ -2049,9 +2062,9 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
values.emplace_back(value);
}
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed);
}
}

Expand All @@ -2071,9 +2084,9 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
values.emplace_back(value);
}
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed);
}
}

Expand All @@ -2093,9 +2106,9 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
values.emplace_back(value);
}
if (negated) {
filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed);
}
}

Expand All @@ -2113,9 +2126,10 @@ void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
values.emplace_back(value);
}
if (negated) {
filters[common::Subfield(inputName)] = std::make_unique<common::NegatedBytesValues>(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] =
std::make_unique<common::NegatedBytesValues>(values, nullAllowed);
} else {
filters[common::Subfield(inputName)] = std::make_unique<common::BytesValues>(values, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::BytesValues>(values, nullAllowed);
}
}

Expand All @@ -2128,7 +2142,7 @@ void SubstraitToVeloxPlanConverter::setSubfieldFilter(
using MultiRangeType = typename RangeTraits<KIND>::MultiRangeType;

if (colFilters.size() == 1) {
filters[common::Subfield(inputName)] = std::move(colFilters[0]);
filters[common::Subfield(inputName, getSeparators())] = std::move(colFilters[0]);
} else if (colFilters.size() > 1) {
// BigintMultiRange should have been sorted
if (colFilters[0]->kind() == common::FilterKind::kBigintRange) {
Expand All @@ -2138,10 +2152,11 @@ void SubstraitToVeloxPlanConverter::setSubfieldFilter(
});
}
if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
filters[common::Subfield(inputName)] =
filters[common::Subfield(inputName, getSeparators())] =
std::make_unique<common::MultiRange>(std::move(colFilters), nullAllowed, true /*nanAllowed*/);
} else {
filters[common::Subfield(inputName)] = std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
filters[common::Subfield(inputName, getSeparators())] =
std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
}
}
}
Expand Down Expand Up @@ -2169,25 +2184,26 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
// Handle bool type filters.
// Not equal.
if (filterInfo.notValue_) {
filters[common::Subfield(inputName)] =
filters[common::Subfield(inputName, getSeparators())] =
std::make_unique<common::BoolValue>(!filterInfo.notValue_.value().value<bool>(), nullAllowed);
} else if (filterInfo.notValues_.size() > 0) {
std::set<bool> notValues;
for (auto v : filterInfo.notValues_) {
notValues.emplace(v.value<bool>());
}
if (notValues.size() == 1) {
filters[common::Subfield(inputName)] = std::make_unique<common::BoolValue>(!(*notValues.begin()), nullAllowed);
filters[common::Subfield(inputName, getSeparators())] =
std::make_unique<common::BoolValue>(!(*notValues.begin()), nullAllowed);
} else {
// if there are more than one distinct value in NOT IN list, the filter should be AlwaysFalse
filters[common::Subfield(inputName)] = std::make_unique<common::AlwaysFalse>();
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::AlwaysFalse>();
}
} else if (rangeSize == 0) {
// IsNull/IsNotNull.
if (!nullAllowed) {
filters[common::Subfield(inputName)] = std::make_unique<common::IsNotNull>();
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::IsNotNull>();
} else if (isNull) {
filters[common::Subfield(inputName)] = std::make_unique<common::IsNull>();
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::IsNull>();
} else {
VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges.");
}
Expand All @@ -2196,17 +2212,17 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
// Equal.
auto value = filterInfo.lowerBounds_[0].value().value<bool>();
VELOX_CHECK(value == filterInfo.upperBounds_[0].value().value<bool>(), "invalid state of bool equal");
filters[common::Subfield(inputName)] = std::make_unique<common::BoolValue>(value, nullAllowed);
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::BoolValue>(value, nullAllowed);
}
} else if constexpr (
KIND == facebook::velox::TypeKind::ARRAY || KIND == facebook::velox::TypeKind::MAP ||
KIND == facebook::velox::TypeKind::ROW) {
// Only IsNotNull and IsNull are supported for complex types.
VELOX_CHECK_EQ(rangeSize, 0, "Only IsNotNull and IsNull are supported for complex type.");
if (!nullAllowed) {
filters[common::Subfield(inputName)] = std::make_unique<common::IsNotNull>();
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::IsNotNull>();
} else if (isNull) {
filters[common::Subfield(inputName)] = std::make_unique<common::IsNull>();
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::IsNull>();
} else {
VELOX_NYI("Only IsNotNull and IsNull are supported for input type '{}'.", inputType->toString());
}
Expand Down Expand Up @@ -2250,16 +2266,17 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after not-equal filter.");
if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
if (colFilters.size() == 1) {
filters[common::Subfield(inputName)] = std::move(colFilters.front());
filters[common::Subfield(inputName, getSeparators())] = std::move(colFilters.front());
} else {
filters[common::Subfield(inputName)] =
filters[common::Subfield(inputName, getSeparators())] =
std::make_unique<common::MultiRange>(std::move(colFilters), nullAllowed, true /*nanAllowed*/);
}
} else {
if (colFilters.size() == 1) {
filters[common::Subfield(inputName)] = std::move(colFilters.front());
filters[common::Subfield(inputName, getSeparators())] = std::move(colFilters.front());
} else {
filters[common::Subfield(inputName)] = std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
filters[common::Subfield(inputName, getSeparators())] =
std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
}
}
return;
Expand All @@ -2269,11 +2286,11 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
if (rangeSize == 0) {
// handle is not null and is null exists at same time
if (existIsNullAndIsNotNull) {
filters[common::Subfield(inputName)] = std::move(std::make_unique<common::AlwaysFalse>());
filters[common::Subfield(inputName, getSeparators())] = std::move(std::make_unique<common::AlwaysFalse>());
} else if (!nullAllowed) {
filters[common::Subfield(inputName)] = std::make_unique<common::IsNotNull>();
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::IsNotNull>();
} else if (isNull) {
filters[common::Subfield(inputName)] = std::make_unique<common::IsNull>();
filters[common::Subfield(inputName, getSeparators())] = std::make_unique<common::IsNull>();
} else {
VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges.");
}
Expand Down

0 comments on commit 2a1a0f1

Please sign in to comment.