From e6ab754713b81ec6a320c424760c0d2aae80cfbd Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Fri, 3 Nov 2023 09:04:17 +0800 Subject: [PATCH 01/20] Issues calling reclaimer / arbitrator APIs in single-thread execution (5790) --- velox/core/QueryCtx.h | 8 ++++++++ velox/exec/Task.cpp | 21 ++++++++------------- velox/exec/tests/TaskTest.cpp | 2 +- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index bab975301c4e..dfb33f1df1bb 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -147,6 +147,14 @@ class QueryCtx { } } + folly::Executor* executor0() const { + if (executor_ != nullptr) { + return executor_; + } + auto executor = executorKeepalive_.get(); + return executor; + } + const std::string queryId_; folly::Executor* const executor_{nullptr}; folly::Executor* const spillExecutor_{nullptr}; diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index ced22d540cc6..686ff34b58bc 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -558,12 +558,6 @@ RowVectorPtr Task::next(ContinueFuture* future) { createSplitGroupStateLocked(kUngroupedGroupId); std::vector> drivers = createDriversLocked(kUngroupedGroupId); - if (pool_->stats().currentBytes != 0) { - VELOX_FAIL( - "Unexpected memory pool allocations during task[{}] driver initialization: {}", - taskId_, - pool_->treeMemoryUsage()); - } drivers_ = std::move(drivers); } @@ -725,12 +719,6 @@ void Task::createAndStartDrivers(uint32_t concurrentSplitGroups) { // Create drivers. std::vector> drivers = createDriversLocked(kUngroupedGroupId); - if (pool_->stats().currentBytes != 0) { - VELOX_FAIL( - "Unexpected memory pool allocations during task[{}] driver initialization: {}", - taskId_, - pool_->treeMemoryUsage()); - } // Prevent the connecting structures from being cleaned up before all // split groups are finished during the grouped execution mode. @@ -860,9 +848,16 @@ void Task::resume(std::shared_ptr self) { continue; } VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated()); - if (!driver->state().hasBlockingFuture) { + if (!driver->state().hasBlockingFuture && + driver->task()->queryCtx()->isExecutorSupplied()) { // Do not continue a Driver that is blocked on external // event. The Driver gets enqueued by the promise realization. + // + // Do not continue the driver if no executor is supplied, + // Since it's likely that we are in single-thread execution. + // + // 2023/07.13 Hongze: Is there a way to hide the execution model + // (single or async) from here? Driver::enqueue(driver); } } diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 37c9e5618395..3d97fd769dc3 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1337,7 +1337,7 @@ DEBUG_ONLY_TEST_F(TaskTest, raceBetweenTaskPauseAndTerminate) { taskThread.join(); } -TEST_F(TaskTest, driverCreationMemoryAllocationCheck) { +TEST_F(TaskTest, DISABLED_driverCreationMemoryAllocationCheck) { exec::Operator::registerOperator(std::make_unique()); auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), From 47525c42ea91b2cd5b8ea8aea9a0e48149acec72 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 9 Nov 2023 06:34:06 +0000 Subject: [PATCH 02/20] Fix array_union on NaN (7086) --- velox/docs/functions/spark/array.rst | 9 + velox/functions/sparksql/ArrayUnionFunction.h | 89 ++++++++ velox/functions/sparksql/Register.cpp | 21 ++ .../sparksql/tests/ArrayUnionTest.cpp | 208 ++++++++++++++++++ velox/functions/sparksql/tests/CMakeLists.txt | 1 + 5 files changed, 328 insertions(+) create mode 100644 velox/functions/sparksql/ArrayUnionFunction.h create mode 100644 velox/functions/sparksql/tests/ArrayUnionTest.cpp diff --git a/velox/docs/functions/spark/array.rst b/velox/docs/functions/spark/array.rst index f80e13923dc0..280945fabe9c 100644 --- a/velox/docs/functions/spark/array.rst +++ b/velox/docs/functions/spark/array.rst @@ -74,6 +74,15 @@ Array Functions SELECT array_sort(ARRAY [NULL, 1, NULL]); -- [1, NULL, NULL] SELECT array_sort(ARRAY [NULL, 2, 1]); -- [1, 2, NULL] +.. spark:function:: array_union(array(E), array(E1)) -> array(E2) + + Returns an array of the elements in the union of array1 and array2, without duplicates. :: + + SELECT array_union(array(1, 2, 3), array(1, 3, 5)); -- [1, 2, 3, 5] + SELECT array_union(array(1, 3, 5), array(1, 2, 3)); -- [1, 3, 5, 2] + SELECT array_union(array(1, 2, 3), array(1, 3, 5, null)); -- [1, 2, 3, 5, null] + SELECT array_union(array(1, 2, NaN), array(1, 3, NaN)); -- [1, 2, NaN, 3] + .. spark:function:: concat(array(E), array(E1), ..., array(En)) -> array(E, E1, ..., En) Returns the concatenation of array(E), array(E1), ..., array(En). :: diff --git a/velox/functions/sparksql/ArrayUnionFunction.h b/velox/functions/sparksql/ArrayUnionFunction.h new file mode 100644 index 000000000000..c8a4f21f5af2 --- /dev/null +++ b/velox/functions/sparksql/ArrayUnionFunction.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::functions::sparksql { + +/// This class implements the array union function. +/// +/// DEFINITION: +/// array_union(x, y) → array +/// Returns an array of the elements in the union of x and y, without +/// duplicates. +template +struct ArrayUnionFunction { + VELOX_DEFINE_FUNCTION_TYPES(T) + + // Fast path for primitives. + template + void call(Out& out, const In& inputArray1, const In& inputArray2) { + folly::F14FastSet elementSet; + bool nullAdded = false; + bool nanAdded = false; + auto addItems = [&](auto& inputArray) { + for (const auto& item : inputArray) { + if (item.has_value()) { + if constexpr ( + std::is_same_v>> || + std::is_same_v>>) { + bool isNaN = std::isnan(item.value()); + if ((isNaN && !nanAdded) || + (!isNaN && elementSet.insert(item.value()).second)) { + auto& newItem = out.add_item(); + newItem = item.value(); + } + if (!nanAdded && isNaN) { + nanAdded = true; + } + } else if (elementSet.insert(item.value()).second) { + auto& newItem = out.add_item(); + newItem = item.value(); + } + } else if (!nullAdded) { + nullAdded = true; + out.add_null(); + } + } + }; + addItems(inputArray1); + addItems(inputArray2); + } + + void call( + out_type>>& out, + const arg_type>>& inputArray1, + const arg_type>>& inputArray2) { + folly::F14FastSet elementSet; + bool nullAdded = false; + auto addItems = [&](auto& inputArray) { + for (const auto& item : inputArray) { + if (item.has_value()) { + if (elementSet.insert(item.value()).second) { + auto& newItem = out.add_item(); + newItem.copy_from(item.value()); + } + } else if (!nullAdded) { + nullAdded = true; + out.add_null(); + } + } + }; + addItems(inputArray1); + addItems(inputArray2); + } +}; +} // namespace facebook::velox::functions::sparksql diff --git a/velox/functions/sparksql/Register.cpp b/velox/functions/sparksql/Register.cpp index bd748af7dadf..d22778b013cf 100644 --- a/velox/functions/sparksql/Register.cpp +++ b/velox/functions/sparksql/Register.cpp @@ -27,6 +27,7 @@ #include "velox/functions/prestosql/StringFunctions.h" #include "velox/functions/sparksql/ArrayMinMaxFunction.h" #include "velox/functions/sparksql/ArraySort.h" +#include "velox/functions/sparksql/ArrayUnionFunction.h" #include "velox/functions/sparksql/Bitwise.h" #include "velox/functions/sparksql/DateTimeFunctions.h" #include "velox/functions/sparksql/Hash.h" @@ -119,6 +120,12 @@ inline void registerArrayMinMaxFunctions(const std::string& prefix) { } } // namespace +template +inline void registerArrayUnionFunctions(const std::string& prefix) { + registerFunction, Array, Array>( + {prefix + "array_union"}); +} + void registerFunctions(const std::string& prefix) { registerAllSpecialFormGeneralFunctions(); @@ -330,6 +337,20 @@ void registerFunctions(const std::string& prefix) { prefix + "unscaled_value", unscaledValueSignatures(), makeUnscaledValue()); + + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions(prefix); + registerArrayUnionFunctions>(prefix); } } // namespace sparksql diff --git a/velox/functions/sparksql/tests/ArrayUnionTest.cpp b/velox/functions/sparksql/tests/ArrayUnionTest.cpp new file mode 100644 index 000000000000..e75719bc5a15 --- /dev/null +++ b/velox/functions/sparksql/tests/ArrayUnionTest.cpp @@ -0,0 +1,208 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/functions/sparksql/tests/SparkFunctionBaseTest.h" + +using namespace facebook::velox; +using namespace facebook::velox::test; + +namespace facebook::velox::functions::sparksql::test { +namespace { + +class ArrayUnionTest : public SparkFunctionBaseTest { + protected: + void testExpression( + const std::string& expression, + const std::vector& input, + const VectorPtr& expected) { + auto result = evaluate(expression, makeRowVector(input)); + assertEqualVectors(expected, result); + } + + template + void testFloatArray() { + const auto array1 = makeArrayVector( + {{1.99, 2.78, 3.98, 4.01}, + {3.89, 4.99, 5.13}, + {7.13, 8.91, std::numeric_limits::quiet_NaN()}, + {10.02, 20.01, std::numeric_limits::quiet_NaN()}}); + const auto array2 = makeArrayVector( + {{2.78, 4.01, 5.99}, + {3.89, 4.99, 5.13}, + {7.13, 8.91, std::numeric_limits::quiet_NaN()}, + {40.99, 50.12}}); + + VectorPtr expected; + expected = makeArrayVector({ + {1.99, 2.78, 3.98, 4.01, 5.99}, + {3.89, 4.99, 5.13}, + {7.13, 8.91, std::numeric_limits::quiet_NaN()}, + {10.02, 20.01, std::numeric_limits::quiet_NaN(), 40.99, 50.12}, + }); + testExpression("array_union(c0, c1)", {array1, array2}, expected); + + expected = makeArrayVector({ + {2.78, 4.01, 5.99, 1.99, 3.98}, + {3.89, 4.99, 5.13}, + {7.13, 8.91, std::numeric_limits::quiet_NaN()}, + {40.99, 50.12, 10.02, 20.01, std::numeric_limits::quiet_NaN()}, + }); + testExpression("array_union(c0, c1)", {array2, array1}, expected); + } +}; + +// Union two integer arrays. +TEST_F(ArrayUnionTest, intArray) { + const auto array1 = makeArrayVector( + {{1, 2, 3, 4}, {3, 4, 5}, {7, 8, 9}, {10, 20, 30}}); + const auto array2 = + makeArrayVector({{2, 4, 5}, {3, 4, 5}, {}, {40, 50}}); + VectorPtr expected; + + expected = makeArrayVector({ + {1, 2, 3, 4, 5}, + {3, 4, 5}, + {7, 8, 9}, + {10, 20, 30, 40, 50}, + }); + testExpression("array_union(c0, c1)", {array1, array2}, expected); + + expected = makeArrayVector({ + {2, 4, 5, 1, 3}, + {3, 4, 5}, + {7, 8, 9}, + {40, 50, 10, 20, 30}, + }); + testExpression("array_union(c0, c1)", {array2, array1}, expected); +} + +// Union two float or double arrays. +TEST_F(ArrayUnionTest, floatArray) { + testFloatArray(); + testFloatArray(); +} + +// Union two string arrays. +TEST_F(ArrayUnionTest, stringArray) { + const auto array1 = + makeArrayVector({{"foo", "bar"}, {"foo", "baz"}}); + const auto array2 = + makeArrayVector({{"foo", "bar"}, {"bar", "baz"}}); + VectorPtr expected; + + expected = makeArrayVector({ + {"foo", "bar"}, + {"foo", "baz", "bar"}, + }); + testExpression("array_union(c0, c1)", {array1, array2}, expected); +} + +// Union two integer arrays with null. +TEST_F(ArrayUnionTest, nullArray) { + const auto array1 = makeNullableArrayVector({ + {{1, std::nullopt, 3, 4}}, + {7, 8, 9}, + {{10, std::nullopt, std::nullopt}}, + }); + const auto array2 = makeNullableArrayVector({ + {{std::nullopt, std::nullopt, 3, 5}}, + std::nullopt, + {{1, 10}}, + }); + VectorPtr expected; + + expected = makeNullableArrayVector({ + {{1, std::nullopt, 3, 4, 5}}, + std::nullopt, + {{10, std::nullopt, 1}}, + }); + testExpression("array_union(c0, c1)", {array1, array2}, expected); + + expected = makeNullableArrayVector({ + {{std::nullopt, 3, 5, 1, 4}}, + std::nullopt, + {{1, 10, std::nullopt}}, + }); + testExpression("array_union(c0, c1)", {array2, array1}, expected); +} + +// Union array vectors. +TEST_F(ArrayUnionTest, complexTypes) { + auto baseVector = makeArrayVector( + {{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}}); + + // Create arrays of array vector using above base vector. + // [[1, 1], [2, 2]] + // [[3, 3], [4, 4]] + // [[5, 5], [6, 6]] + auto arrayOfArrays1 = makeArrayVector({0, 2, 4}, baseVector); + // [[1, 1], [2, 2], [3, 3]] + // [[4, 4]] + // [[5, 5], [6, 6]] + auto arrayOfArrays2 = makeArrayVector({0, 3, 4}, baseVector); + + // [[1, 1], [2, 2], [3, 3]] + // [[3, 3], [4, 4]] + // [[5, 5], [6, 6]] + auto expected = makeArrayVector( + {0, 3, 5}, + makeArrayVector( + {{1, 1}, {2, 2}, {3, 3}, {3, 3}, {4, 4}, {5, 5}, {6, 6}})); + + testExpression( + "array_union(c0, c1)", {arrayOfArrays1, arrayOfArrays2}, expected); +} + +// Union double array vectors. +TEST_F(ArrayUnionTest, complexDoubleType) { + auto baseVector = makeArrayVector( + {{1.0, 1.0}, + {2.0, 2.0}, + {3.0, 3.0}, + {4.0, 4.0}, + {5.0, std::numeric_limits::quiet_NaN()}, + {6.0, 6.0}}); + + // Create arrays of array vector using above base vector. + // [[1.0, 1.0], [2.0, 2.0]] + // [[3.0, 3.0], [4.0, 4.0]] + // [[5.0, NaN], [6.0, 6.0]] + auto arrayOfArrays1 = makeArrayVector({0, 2, 4}, baseVector); + // [[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]] + // [[4.0, 4.0]] + // [[5.0, NaN], [6.0, 6.0]] + auto arrayOfArrays2 = makeArrayVector({0, 3, 4}, baseVector); + + // [[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]] + // [[3.0, 3.0], [4.0, 4.0]] + // [[5.0, NaN], [6.0, 6.0]] + auto expected = makeArrayVector( + {0, 3, 5}, + makeArrayVector( + {{1.0, 1.0}, + {2.0, 2.0}, + {3.0, 3.0}, + {3.0, 3.0}, + {4.0, 4.0}, + {5.0, std::numeric_limits::quiet_NaN()}, + {6.0, 6.0}})); + + testExpression( + "array_union(c0, c1)", {arrayOfArrays1, arrayOfArrays2}, expected); +} +} // namespace +} // namespace facebook::velox::functions::sparksql::test diff --git a/velox/functions/sparksql/tests/CMakeLists.txt b/velox/functions/sparksql/tests/CMakeLists.txt index 86d4d1fa5d52..29e3aed498ac 100644 --- a/velox/functions/sparksql/tests/CMakeLists.txt +++ b/velox/functions/sparksql/tests/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable( ArrayMaxTest.cpp ArrayMinTest.cpp ArraySortTest.cpp + ArrayUnionTest.cpp BitwiseTest.cpp ComparisonsTest.cpp DateTimeFunctionsTest.cpp From b3f156f89dd9705ab9cda0945ae08210b50c433d Mon Sep 17 00:00:00 2001 From: rui-mo Date: Fri, 26 Jan 2024 10:50:56 +0800 Subject: [PATCH 03/20] Support struct column reading with different schemas (5962) --- velox/connectors/hive/SplitReader.cpp | 15 +++- .../common/SelectiveStructColumnReader.cpp | 4 +- velox/dwio/common/TypeWithId.h | 5 ++ .../parquet/reader/ParquetColumnReader.cpp | 9 ++- .../dwio/parquet/reader/ParquetColumnReader.h | 3 +- velox/dwio/parquet/reader/ParquetReader.cpp | 46 ++++++++++- .../parquet/reader/RepeatedColumnReader.cpp | 23 ++++-- .../parquet/reader/RepeatedColumnReader.h | 6 +- .../parquet/reader/StructColumnReader.cpp | 40 +++++++-- .../dwio/parquet/reader/StructColumnReader.h | 3 +- .../parquet/tests/examples/contacts.parquet | Bin 0 -> 1355 bytes .../tests/reader/ParquetTableScanTest.cpp | 76 ++++++++++++++++++ 12 files changed, 201 insertions(+), 29 deletions(-) create mode 100644 velox/dwio/parquet/tests/examples/contacts.parquet diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 6395d5d5f8bb..f63e2e36702b 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -200,9 +200,18 @@ std::vector SplitReader::adaptColumns( } else { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); if (!fileTypeIdx.has_value()) { - // Column is missing. Most likely due to schema evolution. - VELOX_CHECK(tableSchema); - setNullConstantValue(childSpec, tableSchema->findChild(fieldName)); + // If field name exists in the user-specified output type, + // set the column as null constant. + // Related PR: https://github.com/facebookincubator/velox/pull/6427. + auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName); + if (outputTypeIdx.has_value()) { + setNullConstantValue( + childSpec, readerOutputType_->childAt(outputTypeIdx.value())); + } else { + // Column is missing. Most likely due to schema evolution. + VELOX_CHECK(tableSchema); + setNullConstantValue(childSpec, tableSchema->findChild(fieldName)); + } } else { // Column no longer missing, reset constant value set on the spec. childSpec->setConstantValue(nullptr); diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index 1f07f73351e3..ea989bc368fe 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -135,7 +135,6 @@ void SelectiveStructColumnReaderBase::read( } auto& childSpecs = scanSpec_->children(); - VELOX_CHECK(!childSpecs.empty()); for (size_t i = 0; i < childSpecs.size(); ++i) { auto& childSpec = childSpecs[i]; VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str()); @@ -221,7 +220,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant( fileType_->type()->kind() != TypeKind::MAP && // If this is the case it means this is a flat map, // so it can't have "missing" fields. - childSpec.channel() >= fileType_->size()); + !fileType_->containsChild(childSpec.fieldName())); } namespace { @@ -305,7 +304,6 @@ void setNullField( void SelectiveStructColumnReaderBase::getValues( RowSet rows, VectorPtr* result) { - VELOX_CHECK(!scanSpec_->children().empty()); VELOX_CHECK_NOT_NULL( *result, "SelectiveStructColumnReaderBase expects a non-null result"); VELOX_CHECK( diff --git a/velox/dwio/common/TypeWithId.h b/velox/dwio/common/TypeWithId.h index 953ac87b2b8c..96c6cd38fc47 100644 --- a/velox/dwio/common/TypeWithId.h +++ b/velox/dwio/common/TypeWithId.h @@ -59,6 +59,11 @@ class TypeWithId : public velox::Tree> { const std::shared_ptr& childAt(uint32_t idx) const override; + bool containsChild(const std::string& name) const { + VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW); + return type_->as().containsChild(name); + } + const std::shared_ptr& childByName( const std::string& name) const { VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW); diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index c3816c0e960a..9939ec4c3e71 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -35,7 +35,8 @@ std::unique_ptr ParquetColumnReader::build( const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec) { + common::ScanSpec& scanSpec, + memory::MemoryPool& pool) { auto colName = scanSpec.fieldName(); switch (fileType->type()->kind()) { @@ -56,7 +57,7 @@ std::unique_ptr ParquetColumnReader::build( case TypeKind::ROW: return std::make_unique( - requestedType, fileType, params, scanSpec); + requestedType, fileType, params, scanSpec, pool); case TypeKind::VARBINARY: case TypeKind::VARCHAR: @@ -64,11 +65,11 @@ std::unique_ptr ParquetColumnReader::build( case TypeKind::ARRAY: return std::make_unique( - requestedType, fileType, params, scanSpec); + requestedType, fileType, params, scanSpec, pool); case TypeKind::MAP: return std::make_unique( - requestedType, fileType, params, scanSpec); + requestedType, fileType, params, scanSpec, pool); case TypeKind::BOOLEAN: return std::make_unique( diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.h b/velox/dwio/parquet/reader/ParquetColumnReader.h index 516a500cd22c..34a5b2582737 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.h +++ b/velox/dwio/parquet/reader/ParquetColumnReader.h @@ -45,6 +45,7 @@ class ParquetColumnReader { const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec); + common::ScanSpec& scanSpec, + memory::MemoryPool& pool); }; } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 7d9a7d97da19..7dc1fb6d5467 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -84,6 +84,11 @@ class ReaderBase { /// the data still exists in the buffered inputs. bool isRowGroupBuffered(int32_t rowGroupIndex) const; + static std::shared_ptr createTypeWithId( + const std::shared_ptr& inputType, + const RowTypePtr& rowTypePtr, + bool fileColumnNamesReadAsLowerCase); + private: // Reads and parses file footer. void loadFileMetaData(); @@ -589,6 +594,33 @@ std::shared_ptr ReaderBase::createRowType( std::move(childNames), std::move(childTypes)); } +std::shared_ptr ReaderBase::createTypeWithId( + const std::shared_ptr& inputType, + const RowTypePtr& rowTypePtr, + bool fileColumnNamesReadAsLowerCase) { + if (!fileColumnNamesReadAsLowerCase) { + return inputType; + } + std::vector names; + names.reserve(rowTypePtr->names().size()); + std::vector types = rowTypePtr->children(); + for (const auto& name : rowTypePtr->names()) { + std::string childName = name; + folly::toLowerAscii(childName); + names.emplace_back(childName); + } + auto convertedType = + TypeFactory::create(std::move(names), std::move(types)); + + auto children = inputType->getChildren(); + return std::make_shared( + convertedType, + std::move(children), + inputType->id(), + inputType->maxId(), + inputType->column()); +} + void ReaderBase::scheduleRowGroups( const std::vector& rowGroupIds, int32_t currentGroup, @@ -662,13 +694,19 @@ class ParquetRowReader::Impl { } ParquetParams params( pool_, columnReaderStats_, readerBase_->fileMetaData()); - auto columnSelector = std::make_shared( - ColumnSelector::apply(options_.getSelector(), readerBase_->schema())); + auto columnSelector = options_.getSelector() == nullptr + ? std::make_shared(ColumnSelector::apply( + options_.getSelector(), readerBase_->schema())) + : options_.getSelector(); columnReader_ = ParquetColumnReader::build( - columnSelector->getSchemaWithId(), + ReaderBase::createTypeWithId( + columnSelector->getSchemaWithId(), + asRowType(columnSelector->getSchemaWithId()->type()), + readerBase_->isFileColumnNamesReadAsLowerCase()), readerBase_->schemaWithId(), // Id is schema id params, - *options_.getScanSpec()); + *options_.getScanSpec(), + pool_); filterRowGroups(); if (!rowGroupIds_.empty()) { diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp index 250bd204e083..743bfd1be941 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp @@ -33,6 +33,9 @@ PageReader* FOLLY_NULLABLE readLeafRepDefs( return nullptr; } auto pageReader = reader->formatData().as().reader(); + if (pageReader == nullptr) { + return nullptr; + } pageReader->decodeRepDefs(numTop); return pageReader; } @@ -113,7 +116,8 @@ MapColumnReader::MapColumnReader( const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec) + common::ScanSpec& scanSpec, + memory::MemoryPool& pool) : dwio::common::SelectiveMapColumnReader( requestedType, fileType, @@ -123,9 +127,17 @@ MapColumnReader::MapColumnReader( auto& keyChildType = requestedType->childAt(0); auto& elementChildType = requestedType->childAt(1); keyReader_ = ParquetColumnReader::build( - keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]); + keyChildType, + fileType_->childAt(0), + params, + *scanSpec.children()[0], + pool); elementReader_ = ParquetColumnReader::build( - elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]); + elementChildType, + fileType_->childAt(1), + params, + *scanSpec.children()[1], + pool); reinterpret_cast(fileType.get()) ->makeLevelInfo(levelInfo_); children_ = {keyReader_.get(), elementReader_.get()}; @@ -223,7 +235,8 @@ ListColumnReader::ListColumnReader( const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec) + common::ScanSpec& scanSpec, + memory::MemoryPool& pool) : dwio::common::SelectiveListColumnReader( requestedType, fileType, @@ -231,7 +244,7 @@ ListColumnReader::ListColumnReader( scanSpec) { auto& childType = requestedType->childAt(0); child_ = ParquetColumnReader::build( - childType, fileType_->childAt(0), params, *scanSpec.children()[0]); + childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool); reinterpret_cast(fileType.get()) ->makeLevelInfo(levelInfo_); children_ = {child_.get()}; diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.h b/velox/dwio/parquet/reader/RepeatedColumnReader.h index 3155e8d66478..d6c68d2239a9 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.h +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.h @@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader { const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec); + common::ScanSpec& scanSpec, + memory::MemoryPool& pool); void prepareRead( vector_size_t offset, @@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader { const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec); + common::ScanSpec& scanSpec, + memory::MemoryPool& pool); void prepareRead( vector_size_t offset, diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index eca887eab155..af8000046e77 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -30,21 +30,46 @@ StructColumnReader::StructColumnReader( const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec) + common::ScanSpec& scanSpec, + memory::MemoryPool& pool) : SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) { auto& childSpecs = scanSpec_->stableChildren(); + std::vector missingFields; for (auto i = 0; i < childSpecs.size(); ++i) { auto childSpec = childSpecs[i]; if (childSpecs[i]->isConstant()) { continue; } - auto childFileType = fileType_->childByName(childSpec->fieldName()); - auto childRequestedType = - requestedType_->childByName(childSpec->fieldName()); + const auto& fieldName = childSpec->fieldName(); + if (!fileType_->containsChild(fieldName)) { + missingFields.emplace_back(i); + continue; + } + auto childFileType = fileType_->childByName(fieldName); + auto childRequestedType = requestedType_->childByName(fieldName); addChild(ParquetColumnReader::build( - childRequestedType, childFileType, params, *childSpec)); + childRequestedType, childFileType, params, *childSpec, pool)); childSpecs[i]->setSubscript(children_.size() - 1); } + + if (missingFields.size() > 0) { + // Set the struct as null if all the children fields in the output type are + // missing and the number of child fields is more than one. + if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) { + scanSpec_->setConstantValue( + BaseVector::createNullConstant(requestedType_->type(), 1, &pool)); + } else { + // Set null constant for the missing child field of output type. + for (int channel : missingFields) { + childSpecs[channel]->setConstantValue(BaseVector::createNullConstant( + requestedType_->childByName(childSpecs[channel]->fieldName()) + ->type(), + 1, + &pool)); + } + } + } + auto type = reinterpret_cast(fileType_.get()); if (type->parent()) { levelMode_ = reinterpret_cast(fileType_.get()) @@ -54,7 +79,10 @@ StructColumnReader::StructColumnReader( // this and the child. auto child = childForRepDefs_; for (;;) { - assert(child); + if (child == nullptr) { + levelMode_ = LevelMode::kNulls; + break; + } if (child->fileType().type()->kind() == TypeKind::ARRAY || child->fileType().type()->kind() == TypeKind::MAP) { levelMode_ = LevelMode::kStructOverLists; diff --git a/velox/dwio/parquet/reader/StructColumnReader.h b/velox/dwio/parquet/reader/StructColumnReader.h index f38c9e849c73..f03d5549387d 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.h +++ b/velox/dwio/parquet/reader/StructColumnReader.h @@ -35,7 +35,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader { const std::shared_ptr& requestedType, const std::shared_ptr& fileType, ParquetParams& params, - common::ScanSpec& scanSpec); + common::ScanSpec& scanSpec, + memory::MemoryPool& pool); void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) override; diff --git a/velox/dwio/parquet/tests/examples/contacts.parquet b/velox/dwio/parquet/tests/examples/contacts.parquet new file mode 100644 index 0000000000000000000000000000000000000000..fa3751f8dc46282db3ad2faabe5e0b69dfdba1f1 GIT binary patch literal 1355 zcmb7^-)qxQ6vuCG(;A^Lh22X6eF(vp4Qg1EwcScz96rdHGWMv5yGw3sFipE8oygdK zF~q;YCx3u{fRB4sWQzE>2OoSA+0*dc^tS6-CJsVw&X4=~o}6=&%{yCFf{02Az5|>LLNh+ zP3q*&o0p%ZTuRipg
  • 2lraNVcci^fE#xse~;@ov92j1EEj@x3Wjj!Qt{DxAt1y= zAQe5|F*i-urah8Uq$+8G4$_Nsjcm!M!_4^cBttymh5X=NA?~y2*^}6P%Dar@!C68Y z_B~X?ni8@uCki5k64r8UKZ=tQ+9-;x!lMN2qT?F#yokrKOrlsX>KkRzExn{d697vv z#k5YFWwkYI&G1gqLfWXI1(id$p%jGZ2ZhYjBTOt*UMrGAB$D`Qq zg<8(BY|BR4;!H-o8(45Mb_YDMqTX|>%M<3YglRwVzL~t}bF*p2Ni=X1vtqV=9(b|Y ze6(+hBLgAd1Gh}`!-RKu1debJ1Z*dOgfR?ku@0 tORJ8vQs-{Xb{)6P>aMfA)8K8#v8#@4yQ|BKy7+R(7dnGK;U@e>{{rEFM9lyI literal 0 HcmV?d00001 diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 1ec63e3e793a..4d9a4dfa648b 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -443,6 +443,82 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) { result.second, {makeRowVector({"a"}, {makeFlatVector({0, 1})})}); } +TEST_F(ParquetTableScanTest, structSelection) { + auto vector = makeArrayVector({{}}); + loadData( + getExampleFilePath("contacts.parquet"), + ROW({"name"}, {ROW({"first", "last"}, {VARCHAR(), VARCHAR()})}), + makeRowVector( + {"t"}, + { + vector, + })); + assertSelectWithFilter({"name"}, {}, "", "SELECT ('Janet', 'Jones')"); + + loadData( + getExampleFilePath("contacts.parquet"), + ROW({"name"}, + {ROW( + {"first", "middle", "last"}, {VARCHAR(), VARCHAR(), VARCHAR()})}), + makeRowVector( + {"t"}, + { + vector, + })); + assertSelectWithFilter({"name"}, {}, "", "SELECT ('Janet', null, 'Jones')"); + + loadData( + getExampleFilePath("contacts.parquet"), + ROW({"name"}, {ROW({"first", "middle"}, {VARCHAR(), VARCHAR()})}), + makeRowVector( + {"t"}, + { + vector, + })); + assertSelectWithFilter({"name"}, {}, "", "SELECT ('Janet', null)"); + + loadData( + getExampleFilePath("contacts.parquet"), + ROW({"name"}, {ROW({"middle", "last"}, {VARCHAR(), VARCHAR()})}), + makeRowVector( + {"t"}, + { + vector, + })); + assertSelectWithFilter({"name"}, {}, "", "SELECT (null, 'Jones')"); + + loadData( + getExampleFilePath("contacts.parquet"), + ROW({"name"}, {ROW({"middle"}, {VARCHAR()})}), + makeRowVector( + {"t"}, + { + vector, + })); + assertSelectWithFilter({"name"}, {}, "", "SELECT row(null)"); + + loadData( + getExampleFilePath("contacts.parquet"), + ROW({"name"}, {ROW({"middle", "info"}, {VARCHAR(), VARCHAR()})}), + makeRowVector( + {"t"}, + { + vector, + })); + assertSelectWithFilter({"name"}, {}, "", "SELECT NULL"); + + loadData( + getExampleFilePath("contacts.parquet"), + ROW({"name"}, {ROW({}, {})}), + makeRowVector( + {"t"}, + { + vector, + })); + + assertSelectWithFilter({"name"}, {}, "", "SELECT t from tmp"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false}; From cb7adad46ca3bc83cf5bd8c9244a72497d1178a6 Mon Sep 17 00:00:00 2001 From: Jia Date: Thu, 23 Nov 2023 00:58:13 +0000 Subject: [PATCH 04/20] Support timestamp reader for Parquet file format (4680) --- velox/connectors/hive/HiveDataSource.cpp | 13 ++- velox/dwio/common/SelectiveColumnReader.cpp | 3 + velox/dwio/parquet/reader/PageReader.cpp | 48 +++++++- .../parquet/reader/ParquetColumnReader.cpp | 5 + velox/dwio/parquet/reader/ParquetReader.cpp | 2 +- .../parquet/reader/TimestampColumnReader.h | 49 ++++++++ .../tests/examples/timestamp_int96.parquet | Bin 0 -> 560 bytes .../tests/reader/ParquetTableScanTest.cpp | 106 ++++++++++++++++++ velox/exec/tests/utils/PlanBuilder.cpp | 6 +- velox/exec/tests/utils/PlanBuilder.h | 9 +- velox/type/Type.h | 5 + 11 files changed, 236 insertions(+), 10 deletions(-) create mode 100644 velox/dwio/parquet/reader/TimestampColumnReader.h create mode 100644 velox/dwio/parquet/tests/examples/timestamp_int96.parquet diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 6ff99105364c..d3168dc7d584 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -158,11 +158,14 @@ HiveDataSource::HiveDataSource( for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) { filters.emplace(k.clone(), v->clone()); } - auto remainingFilter = extractFiltersFromRemainingFilter( - hiveTableHandle_->remainingFilter(), - expressionEvaluator_, - false, - filters); + auto remainingFilter = hiveTableHandle_->remainingFilter(); + if (hiveTableHandle_->isFilterPushdownEnabled()) { + remainingFilter = extractFiltersFromRemainingFilter( + hiveTableHandle_->remainingFilter(), + expressionEvaluator_, + false, + filters); + } std::vector remainingFilterSubfields; if (remainingFilter) { diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index 25aff8eb42c3..81fba3bf2bac 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -215,6 +215,9 @@ void SelectiveColumnReader::getIntValues( VELOX_FAIL("Unsupported value size: {}", valueSize_); } break; + case TypeKind::TIMESTAMP: + getFlatValues(rows, result, requestedType); + break; default: VELOX_FAIL( "Not a valid type for integer reader: {}", requestedType->toString()); diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index e6f44f42ecca..6a71e8108c88 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -366,6 +366,51 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } break; } + case thrift::Type::INT96: { + auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp); + dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp); + if (pageData_) { + memcpy(dictionary_.values->asMutable(), pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + static constexpr int64_t kJulianToUnixEpochDays = 2440588LL; + static constexpr int64_t kSecondsPerDay = 86400LL; + static constexpr int64_t kNanosPerSecond = + Timestamp::kNanosecondsInMillisecond * + Timestamp::kMillisecondsInSecond; + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Convert the timestamp into seconds and nanos since the Unix epoch, + // 00:00:00.000000 on 1 January 1970. + uint64_t nanos; + memcpy( + &nanos, + parquetValues + i * sizeof(Int96Timestamp), + sizeof(uint64_t)); + int32_t days; + memcpy( + &days, + parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t), + sizeof(int32_t)); + int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay; + if (nanos > Timestamp::kMaxNanos) { + seconds += nanos / kNanosPerSecond; + nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond; + } + values[i] = Timestamp(seconds, nanos); + } + break; + } case thrift::Type::BYTE_ARRAY: { dictionary_.values = AlignedBuffer::allocate(dictionary_.numValues, &pool_); @@ -456,7 +501,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); } - case thrift::Type::INT96: default: VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); @@ -483,6 +527,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) { case thrift::Type::INT64: case thrift::Type::DOUBLE: return 8; + case thrift::Type::INT96: + return 12; default: VELOX_FAIL("Type does not have a byte width {}", type); } diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index 9939ec4c3e71..da7db830e32a 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -27,6 +27,7 @@ #include "velox/dwio/parquet/reader/RepeatedColumnReader.h" #include "velox/dwio/parquet/reader/StringColumnReader.h" #include "velox/dwio/parquet/reader/StructColumnReader.h" +#include "velox/dwio/parquet/reader/TimestampColumnReader.h" namespace facebook::velox::parquet { @@ -75,6 +76,10 @@ std::unique_ptr ParquetColumnReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); + case TypeKind::TIMESTAMP: + return std::make_unique( + requestedType, fileType, params, scanSpec); + default: VELOX_FAIL( "buildReader unhandled type: " + diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 7dc1fb6d5467..18633dd11c9f 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -557,7 +557,7 @@ TypePtr ReaderBase::convertType( case thrift::Type::type::INT64: return BIGINT(); case thrift::Type::type::INT96: - return DOUBLE(); // TODO: Lose precision + return TIMESTAMP(); case thrift::Type::type::FLOAT: return REAL(); case thrift::Type::type::DOUBLE: diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h new file mode 100644 index 000000000000..4c534b4bfcee --- /dev/null +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/reader/IntegerColumnReader.h" +#include "velox/dwio/parquet/reader/ParquetColumnReader.h" + +namespace facebook::velox::parquet { + +class TimestampColumnReader : public IntegerColumnReader { + public: + TimestampColumnReader( + const std::shared_ptr& requestedType, + std::shared_ptr fileType, + ParquetParams& params, + common::ScanSpec& scanSpec) + : IntegerColumnReader(requestedType, fileType, params, scanSpec) {} + + bool hasBulkPath() const override { + return false; + } + + void read( + vector_size_t offset, + RowSet rows, + const uint64_t* /*incomingNulls*/) override { + auto& data = formatData_->as(); + // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + prepareRead(offset, rows, nullptr); + readCommon(rows); + readOffset_ += rows.back() + 1; + } +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96.parquet new file mode 100644 index 0000000000000000000000000000000000000000..ea3a125aab6062da3e978a20eae207b0755b169c GIT binary patch literal 560 zcmZWnL1@!Z8286zK zGzJ6lC}Rgf@Zd!~3Z6WB^yp0#EF!p_^q}aU(VaZKkMIBf@B4V~tt_sVsYs6nIuz*N zlPB+md8$wm;DHVA&OIwE0Nma!!UY-Xf_!}K=F-WlF@RxE1W;anUbqPGmLvYQUic&f zTslD9e*WXC0Pyt&V)^HtH#WfHCgMkr*Zvd$4uy$2>3qyB1JpB{UrWEAo(FiH6oCRY z=>*+>vG+kTvKmyPd4TbG0vIQVxpDW__90^3P>%>*RS{C6+t=l!%i{a%iNDDsMhQ@< zNHie@RnZOIC>;@lO8ShH7ja!E8L%Pea~;PitkM~s=VcSZrO8P Te{OnGHw?Vy4SdlR{44(eD@c)t literal 0 HcmV?d00001 diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 4d9a4dfa648b..e1d99603ea65 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -73,6 +73,34 @@ class ParquetTableScanTest : public HiveConnectorTestBase { assertQuery(plan, splits_, sql); } + void assertSelectWithFilter( + std::vector&& outputColumnNames, + const std::vector& subfieldFilters, + const std::string& remainingFilter, + const std::string& sql, + bool isFilterPushdownEnabled) { + auto rowType = getRowType(std::move(outputColumnNames)); + parse::ParseOptions options; + options.parseDecimalAsDouble = false; + + auto plan = PlanBuilder(pool_.get()) + .setParseOptions(options) + // Function extractFiltersFromRemainingFilter will extract + // filters to subfield filters, but for some types, filter + // pushdown is not supported. + .tableScan( + "hive_table", + rowType, + {}, + subfieldFilters, + remainingFilter, + nullptr, + isFilterPushdownEnabled) + .planNode(); + + assertQuery(plan, splits_, sql); + } + void assertSelectWithAgg( std::vector&& outputColumnNames, const std::vector& aggregates, @@ -519,6 +547,84 @@ TEST_F(ParquetTableScanTest, structSelection) { assertSelectWithFilter({"name"}, {}, "", "SELECT t from tmp"); } +TEST_F(ParquetTableScanTest, timestampFilter) { + // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and + // 10 rows in one row group. Data is in SNAPPY compressed format. + // The values are: + // |t | + // +-------------------+ + // |2015-06-01 19:34:56| + // |2015-06-02 19:34:56| + // |2001-02-03 03:34:06| + // |1998-03-01 08:01:06| + // |2022-12-23 03:56:01| + // |1980-01-24 00:23:07| + // |1999-12-08 13:39:26| + // |2023-04-21 09:09:34| + // |2000-09-12 22:36:29| + // |2007-12-12 04:27:56| + // +-------------------+ + auto vector = makeFlatVector( + {Timestamp(1433116800, 70496000000000), + Timestamp(1433203200, 70496000000000), + Timestamp(981158400, 12846000000000), + Timestamp(888710400, 28866000000000), + Timestamp(1671753600, 14161000000000), + Timestamp(317520000, 1387000000000), + Timestamp(944611200, 49166000000000), + Timestamp(1682035200, 32974000000000), + Timestamp(968716800, 81389000000000), + Timestamp(1197417600, 16076000000000)}); + + loadData( + getExampleFilePath("timestamp_int96.parquet"), + ROW({"t"}, {TIMESTAMP()}), + makeRowVector( + {"t"}, + { + vector, + })); + + assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp", false); + assertSelectWithFilter( + {"t"}, + {}, + "t < TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t <= TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t > TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t >= TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t == TIMESTAMP '2022-12-23 03:56:01'", + "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'", + false); + VELOX_ASSERT_THROW( + assertSelectWithFilter( + {"t"}, + {"t < TIMESTAMP '2000-09-12 22:36:29'"}, + "", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"), + "testInt128() is not supported"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false}; diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 85b3805db097..85c6e2c7d059 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -97,12 +97,14 @@ PlanBuilder& PlanBuilder::tableScan( const std::unordered_map& columnAliases, const std::vector& subfieldFilters, const std::string& remainingFilter, - const RowTypePtr& dataColumns) { + const RowTypePtr& dataColumns, + const bool isFilterPushdownEnabled) { return TableScanBuilder(*this) .tableName(tableName) .outputType(outputType) .columnAliases(columnAliases) .subfieldFilters(subfieldFilters) + .isFilterPushdownEnabled(isFilterPushdownEnabled) .remainingFilter(remainingFilter) .dataColumns(dataColumns) .endTableScan(); @@ -200,7 +202,7 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) { tableHandle_ = std::make_shared( connectorId_, tableName_, - true, + isFilterPushdownEnabled_, std::move(filters), remainingFilterExpr, dataColumns_); diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 5adf218555a0..dcafe082e945 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -144,7 +144,8 @@ class PlanBuilder { const std::unordered_map& columnAliases = {}, const std::vector& subfieldFilters = {}, const std::string& remainingFilter = "", - const RowTypePtr& dataColumns = nullptr); + const RowTypePtr& dataColumns = nullptr, + bool isFilterPushdownEnabled = true); /// Add a TableScanNode to scan a TPC-H table. /// @@ -209,6 +210,11 @@ class PlanBuilder { return *this; } + TableScanBuilder& isFilterPushdownEnabled(bool isFilterPushdownEnabled) { + isFilterPushdownEnabled_ = std::move(isFilterPushdownEnabled); + return *this; + } + /// @param dataColumns can be different from 'outputType' for the purposes /// of testing queries using missing columns. It is used, if specified, for /// parseExpr call and as 'dataColumns' for the TableHandle. You supply more @@ -269,6 +275,7 @@ class PlanBuilder { std::shared_ptr tableHandle_; std::unordered_map> assignments_; + bool isFilterPushdownEnabled_; }; /// Start a TableScanBuilder. diff --git a/velox/type/Type.h b/velox/type/Type.h index 5670d1f34b63..6c25ea3a9915 100644 --- a/velox/type/Type.h +++ b/velox/type/Type.h @@ -45,6 +45,11 @@ namespace facebook::velox { using int128_t = __int128_t; +struct __attribute__((__packed__)) Int96Timestamp { + int32_t days; + uint64_t nanos; +}; + /// Velox type system supports a small set of SQL-compatible composeable types: /// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR, /// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW From 9b47301b1cdbd0fa85343ede2106a19af4a68120 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 1 Feb 2024 17:49:51 +0800 Subject: [PATCH 05/20] Add config for registration (7110) --- .../lib/aggregates/BitwiseAggregateBase.h | 8 +++- .../aggregates/BitwiseAggregates.cpp | 14 +++++-- .../prestosql/aggregates/CountAggregate.cpp | 9 ++++- .../aggregates/CovarianceAggregates.cpp | 28 ++++++++++---- .../prestosql/aggregates/MinMaxAggregates.cpp | 18 ++++++--- .../aggregates/RegisterAggregateFunctions.cpp | 38 +++++++++++++------ .../aggregates/RegisterAggregateFunctions.h | 3 +- .../aggregates/VarianceAggregates.cpp | 32 +++++++++++----- .../sparksql/aggregates/AverageAggregate.cpp | 6 ++- .../sparksql/aggregates/AverageAggregate.h | 3 +- .../aggregates/BitwiseXorAggregate.cpp | 6 ++- .../sparksql/aggregates/BitwiseXorAggregate.h | 4 +- .../aggregates/FirstLastAggregate.cpp | 26 +++++++++---- .../sparksql/aggregates/MinMaxByAggregate.cpp | 20 +++++++--- .../sparksql/aggregates/Register.cpp | 23 +++++++---- .../functions/sparksql/aggregates/Register.h | 3 +- .../sparksql/aggregates/SumAggregate.cpp | 9 ++++- .../sparksql/aggregates/SumAggregate.h | 5 ++- 18 files changed, 184 insertions(+), 71 deletions(-) diff --git a/velox/functions/lib/aggregates/BitwiseAggregateBase.h b/velox/functions/lib/aggregates/BitwiseAggregateBase.h index 1e62bc1e4fda..6f2aeaad4bc3 100644 --- a/velox/functions/lib/aggregates/BitwiseAggregateBase.h +++ b/velox/functions/lib/aggregates/BitwiseAggregateBase.h @@ -72,7 +72,9 @@ class BitwiseAggregateBase : public SimpleNumericAggregate { template