diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 1d7e8548aa04..f872df47bf01 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -184,7 +184,6 @@ set(VELOX_SRCS memory/VeloxMemoryManager.cc operators/functions/RegistrationAllFunctions.cc operators/functions/RowConstructorWithNull.cc - operators/functions/SparkTokenizer.cc operators/serializer/VeloxColumnarToRowConverter.cc operators/serializer/VeloxColumnarBatchSerializer.cc operators/serializer/VeloxRowToColumnarConverter.cc diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index dcfadc42c0c9..7b7d6c8a6685 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -33,7 +33,6 @@ #include "compute/VeloxRuntime.h" #include "config/VeloxConfig.h" #include "jni/JniFileSystem.h" -#include "operators/functions/SparkTokenizer.h" #include "udf/UdfLoader.h" #include "utils/exception.h" #include "velox/common/caching/SsdCache.h" @@ -124,7 +123,6 @@ void VeloxBackend::init(const std::unordered_map& conf velox::exec::Operator::registerOperator(std::make_unique()); initUdf(); - registerSparkTokenizer(); // Initialize the global memory manager for current process. auto sparkOverhead = backendConf_->get(kSparkOverheadMemory); diff --git a/cpp/velox/operators/functions/SparkTokenizer.cc b/cpp/velox/operators/functions/SparkTokenizer.cc deleted file mode 100644 index 952abc0c6e9c..000000000000 --- a/cpp/velox/operators/functions/SparkTokenizer.cc +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "operators/functions/SparkTokenizer.h" -#include "velox/type/Tokenizer.h" - -namespace gluten { -namespace { - -class SparkTokenizer : public facebook::velox::common::Tokenizer { - public: - explicit SparkTokenizer(const std::string& path) : path_(path) { - state_ = State::kNotReady; - } - - bool hasNext() override { - if (state_ == State::kDone) { - return false; - } else if (state_ == State::kNotReady) { - return true; - } - VELOX_FAIL("Illegal state."); - } - - std::unique_ptr next() override { - if (!hasNext()) { - VELOX_USER_FAIL("No more tokens."); - } - state_ = State::kDone; - return std::make_unique(path_); - } - - private: - const std::string path_; - State state_; -}; -} // namespace - -void registerSparkTokenizer() { - facebook::velox::common::Tokenizer::registerInstanceFactory( - [](const std::string& p) { return std::make_unique(p); }); -} - -} // namespace gluten diff --git a/cpp/velox/operators/functions/SparkTokenizer.h b/cpp/velox/operators/functions/SparkTokenizer.h deleted file mode 100644 index 9ed267f47898..000000000000 --- a/cpp/velox/operators/functions/SparkTokenizer.h +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -namespace gluten { - -void registerSparkTokenizer(); - -} // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index bd545a960947..67676b0354e4 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -178,6 +178,19 @@ RowTypePtr getJoinOutputType( } VELOX_FAIL("Output should include left or right columns."); } + +// Returns the field name separators used to create Subfield. +std::shared_ptr getSeparators() { + auto separators = std::make_shared(); + // ']', '.', '[', '*', '^' are not separators in Spark. + separators->closeBracket = '\0'; + separators->dot = '\0'; + separators->openBracket = '\0'; + separators->wildCard = '\0'; + separators->unicodeCaret = '\0'; + return separators; +} + } // namespace core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit( @@ -2027,9 +2040,9 @@ void SubstraitToVeloxPlanConverter::setInFilter( values.emplace_back(value); } if (negated) { - filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed); } else { - filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed); } } @@ -2049,9 +2062,9 @@ void SubstraitToVeloxPlanConverter::setInFilter( values.emplace_back(value); } if (negated) { - filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed); } else { - filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed); } } @@ -2071,9 +2084,9 @@ void SubstraitToVeloxPlanConverter::setInFilter( values.emplace_back(value); } if (negated) { - filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed); } else { - filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed); } } @@ -2093,9 +2106,9 @@ void SubstraitToVeloxPlanConverter::setInFilter( values.emplace_back(value); } if (negated) { - filters[common::Subfield(inputName)] = common::createNegatedBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createNegatedBigintValues(values, nullAllowed); } else { - filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = common::createBigintValues(values, nullAllowed); } } @@ -2113,9 +2126,10 @@ void SubstraitToVeloxPlanConverter::setInFilter( values.emplace_back(value); } if (negated) { - filters[common::Subfield(inputName)] = std::make_unique(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = + std::make_unique(values, nullAllowed); } else { - filters[common::Subfield(inputName)] = std::make_unique(values, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(values, nullAllowed); } } @@ -2128,7 +2142,7 @@ void SubstraitToVeloxPlanConverter::setSubfieldFilter( using MultiRangeType = typename RangeTraits::MultiRangeType; if (colFilters.size() == 1) { - filters[common::Subfield(inputName)] = std::move(colFilters[0]); + filters[common::Subfield(inputName, getSeparators())] = std::move(colFilters[0]); } else if (colFilters.size() > 1) { // BigintMultiRange should have been sorted if (colFilters[0]->kind() == common::FilterKind::kBigintRange) { @@ -2138,10 +2152,11 @@ void SubstraitToVeloxPlanConverter::setSubfieldFilter( }); } if constexpr (std::is_same_v) { - filters[common::Subfield(inputName)] = + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(std::move(colFilters), nullAllowed, true /*nanAllowed*/); } else { - filters[common::Subfield(inputName)] = std::make_unique(std::move(colFilters), nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = + std::make_unique(std::move(colFilters), nullAllowed); } } } @@ -2169,7 +2184,7 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters( // Handle bool type filters. // Not equal. if (filterInfo.notValue_) { - filters[common::Subfield(inputName)] = + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(!filterInfo.notValue_.value().value(), nullAllowed); } else if (filterInfo.notValues_.size() > 0) { std::set notValues; @@ -2177,17 +2192,18 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters( notValues.emplace(v.value()); } if (notValues.size() == 1) { - filters[common::Subfield(inputName)] = std::make_unique(!(*notValues.begin()), nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = + std::make_unique(!(*notValues.begin()), nullAllowed); } else { // if there are more than one distinct value in NOT IN list, the filter should be AlwaysFalse - filters[common::Subfield(inputName)] = std::make_unique(); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(); } } else if (rangeSize == 0) { // IsNull/IsNotNull. if (!nullAllowed) { - filters[common::Subfield(inputName)] = std::make_unique(); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(); } else if (isNull) { - filters[common::Subfield(inputName)] = std::make_unique(); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(); } else { VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges."); } @@ -2196,7 +2212,7 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters( // Equal. auto value = filterInfo.lowerBounds_[0].value().value(); VELOX_CHECK(value == filterInfo.upperBounds_[0].value().value(), "invalid state of bool equal"); - filters[common::Subfield(inputName)] = std::make_unique(value, nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(value, nullAllowed); } } else if constexpr ( KIND == facebook::velox::TypeKind::ARRAY || KIND == facebook::velox::TypeKind::MAP || @@ -2204,9 +2220,9 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters( // Only IsNotNull and IsNull are supported for complex types. VELOX_CHECK_EQ(rangeSize, 0, "Only IsNotNull and IsNull are supported for complex type."); if (!nullAllowed) { - filters[common::Subfield(inputName)] = std::make_unique(); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(); } else if (isNull) { - filters[common::Subfield(inputName)] = std::make_unique(); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(); } else { VELOX_NYI("Only IsNotNull and IsNull are supported for input type '{}'.", inputType->toString()); } @@ -2250,16 +2266,17 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters( VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after not-equal filter."); if constexpr (std::is_same_v) { if (colFilters.size() == 1) { - filters[common::Subfield(inputName)] = std::move(colFilters.front()); + filters[common::Subfield(inputName, getSeparators())] = std::move(colFilters.front()); } else { - filters[common::Subfield(inputName)] = + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(std::move(colFilters), nullAllowed, true /*nanAllowed*/); } } else { if (colFilters.size() == 1) { - filters[common::Subfield(inputName)] = std::move(colFilters.front()); + filters[common::Subfield(inputName, getSeparators())] = std::move(colFilters.front()); } else { - filters[common::Subfield(inputName)] = std::make_unique(std::move(colFilters), nullAllowed); + filters[common::Subfield(inputName, getSeparators())] = + std::make_unique(std::move(colFilters), nullAllowed); } } return; @@ -2269,11 +2286,11 @@ void SubstraitToVeloxPlanConverter::constructSubfieldFilters( if (rangeSize == 0) { // handle is not null and is null exists at same time if (existIsNullAndIsNotNull) { - filters[common::Subfield(inputName)] = std::move(std::make_unique()); + filters[common::Subfield(inputName, getSeparators())] = std::move(std::make_unique()); } else if (!nullAllowed) { - filters[common::Subfield(inputName)] = std::make_unique(); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(); } else if (isNull) { - filters[common::Subfield(inputName)] = std::make_unique(); + filters[common::Subfield(inputName, getSeparators())] = std::make_unique(); } else { VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges."); }