From 801e2ce28c2a22b9c4e7230de26965252a7d2c83 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 9 Oct 2023 15:19:19 -0700 Subject: [PATCH] Introducing IcebergSplitReader In this commit we introduces IcebergSplitReader which supports reading Iceberg splits with positional delete files. --- scripts/setup-macos.sh | 3 + velox/benchmarks/tpch/CMakeLists.txt | 1 + velox/common/testutil/tests/CMakeLists.txt | 10 +- velox/connectors/hive/CMakeLists.txt | 31 ++- velox/connectors/hive/FileHandle.h | 8 +- velox/connectors/hive/HiveConnector.cpp | 1 - velox/connectors/hive/HiveDataSource.cpp | 3 +- velox/connectors/hive/HiveDataSource.h | 6 +- velox/connectors/hive/SplitReader.cpp | 44 ++-- velox/connectors/hive/SplitReader.h | 6 +- .../connectors/hive/benchmarks/CMakeLists.txt | 1 + velox/connectors/hive/iceberg/CMakeLists.txt | 37 ++++ .../hive/iceberg/DeleteFileReader.cpp | 192 ++++++++++++++++ .../hive/iceberg/DeleteFileReader.h | 84 +++++++ .../hive/iceberg/IcebergDeleteFile.h | 69 ++++++ .../hive/iceberg/IcebergMetadataColumns.h | 56 +++++ .../connectors/hive/iceberg/IcebergSplit.cpp | 69 ++++++ velox/connectors/hive/iceberg/IcebergSplit.h | 56 +++++ .../hive/iceberg/IcebergSplitReader.cpp | 105 +++++++++ .../hive/iceberg/IcebergSplitReader.h | 61 +++++ .../hive/iceberg/tests/CMakeLists.txt | 28 +++ .../hive/iceberg/tests/IcebergReadTest.cpp | 208 ++++++++++++++++++ .../gcs/examples/CMakeLists.txt | 1 + .../storage_adapters/gcs/tests/CMakeLists.txt | 1 + .../hdfs/tests/CMakeLists.txt | 1 + velox/connectors/hive/tests/CMakeLists.txt | 1 + velox/dwio/dwrf/test/CMakeLists.txt | 2 + velox/dwio/parquet/tests/CMakeLists.txt | 1 + .../dwio/parquet/tests/reader/CMakeLists.txt | 2 + velox/examples/CMakeLists.txt | 1 + velox/exec/tests/CMakeLists.txt | 3 + velox/exec/tests/utils/CMakeLists.txt | 1 + .../codegen/benchmark/CMakeLists.txt | 4 +- .../experimental/codegen/tests/CMakeLists.txt | 1 + .../vector_function/tests/CMakeLists.txt | 1 + .../wave/exec/tests/CMakeLists.txt | 1 + velox/expression/tests/CMakeLists.txt | 10 +- .../aggregates/benchmarks/CMakeLists.txt | 2 + .../prestosql/aggregates/tests/CMakeLists.txt | 1 + .../sparksql/aggregates/tests/CMakeLists.txt | 1 + velox/substrait/tests/CMakeLists.txt | 1 + 41 files changed, 1065 insertions(+), 50 deletions(-) create mode 100644 velox/connectors/hive/iceberg/CMakeLists.txt create mode 100644 velox/connectors/hive/iceberg/DeleteFileReader.cpp create mode 100644 velox/connectors/hive/iceberg/DeleteFileReader.h create mode 100644 velox/connectors/hive/iceberg/IcebergDeleteFile.h create mode 100644 velox/connectors/hive/iceberg/IcebergMetadataColumns.h create mode 100644 velox/connectors/hive/iceberg/IcebergSplit.cpp create mode 100644 velox/connectors/hive/iceberg/IcebergSplit.h create mode 100644 velox/connectors/hive/iceberg/IcebergSplitReader.cpp create mode 100644 velox/connectors/hive/iceberg/IcebergSplitReader.h create mode 100644 velox/connectors/hive/iceberg/tests/CMakeLists.txt create mode 100644 velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp diff --git a/scripts/setup-macos.sh b/scripts/setup-macos.sh index 83d8990603eae..321b66be4d347 100755 --- a/scripts/setup-macos.sh +++ b/scripts/setup-macos.sh @@ -25,6 +25,9 @@ # $ scripts/setup-macos.sh install_googletest install_fmt # +INSTALL_PREREQUISITES="N" +PROMPT_ALWAYS_RESPOND="n" + set -e # Exit on error. set -x # Print commands that are executed. diff --git a/velox/benchmarks/tpch/CMakeLists.txt b/velox/benchmarks/tpch/CMakeLists.txt index ef0147ad5c93c..7040028929ea6 100644 --- a/velox/benchmarks/tpch/CMakeLists.txt +++ b/velox/benchmarks/tpch/CMakeLists.txt @@ -24,6 +24,7 @@ target_link_libraries( velox_dwio_parquet_reader velox_dwio_common_test_utils velox_hive_connector + # velox_hive_iceberg_splitreader velox_exception velox_memory velox_process diff --git a/velox/common/testutil/tests/CMakeLists.txt b/velox/common/testutil/tests/CMakeLists.txt index 02b7ee5f1045b..d232c201d931d 100644 --- a/velox/common/testutil/tests/CMakeLists.txt +++ b/velox/common/testutil/tests/CMakeLists.txt @@ -16,11 +16,5 @@ add_executable(velox_test_util_test TestValueTest.cpp SpillConfigTest.cpp) gtest_add_tests(velox_test_util_test "" AUTO) target_link_libraries( - velox_test_util_test - PRIVATE - velox_test_util - velox_exception - velox_spill_config - velox_exec - gtest - gtest_main) + velox_test_util_test PRIVATE velox_test_util velox_exception + velox_spill_config velox_exec gtest gtest_main) diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 5fbd19245169e..97f6026ebbc85 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -16,6 +16,10 @@ add_library(velox_hive_config OBJECT HiveConfig.cpp) target_link_libraries(velox_hive_config velox_exception) +add_subdirectory(iceberg) + +# find_package(iceberg CONFIG REQUIRED) + add_library( velox_hive_connector OBJECT FileHandle.cpp @@ -28,25 +32,28 @@ add_library( SplitReader.cpp TableHandle.cpp) +# target_include_directories(velox_hive_connector PUBLIC iceberg::iceberg) target_link_libraries( velox_hive_connector - velox_common_io - velox_connector - velox_dwio_catalog_fbhive - velox_dwio_dwrf_reader - velox_dwio_dwrf_writer - velox_dwio_parquet_reader - velox_dwio_parquet_writer - velox_file - velox_hive_partition_function - velox_s3fs - velox_hdfs - velox_gcs) + PRIVATE velox_common_io + velox_connector + velox_dwio_catalog_fbhive + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_file + velox_hive_partition_function + velox_s3fs + velox_hdfs + velox_gcs + PUBLIC velox_hive_iceberg_splitreader) add_library(velox_hive_partition_function HivePartitionFunction.cpp) target_link_libraries(velox_hive_partition_function velox_core velox_exec) +# add_subdirectory(iceberg) add_subdirectory(storage_adapters) if(${VELOX_BUILD_TESTING}) diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 15edd9d2ac2f4..9482051fcb3b0 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -25,15 +25,15 @@ #pragma once -#include -#include -#include - #include "velox/common/caching/CachedFactory.h" #include "velox/common/caching/FileIds.h" #include "velox/common/file/File.h" #include "velox/dwio/common/InputStream.h" +//#include +//#include +//#include + namespace facebook::velox { class Config; diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 184d40cd981ac..be753d2f387de 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -75,7 +75,6 @@ std::unique_ptr HiveConnector::createDataSource( std::string, std::shared_ptr>& columnHandles, ConnectorQueryCtx* connectorQueryCtx) { - return std::make_unique( outputType, tableHandle, diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 68452b1c5a627..3f0a72d37ded9 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -501,8 +501,8 @@ std::unique_ptr HiveDataSource::createSplitReader() { readerOutputType_, &partitionKeys_, fileHandleFactory_, - executor_, connectorQueryCtx_, + executor_, ioStats_); } @@ -518,7 +518,6 @@ void HiveDataSource::addSplit(std::shared_ptr split) { if (splitReader_) { splitReader_.reset(); } - splitReader_ = createSplitReader(); splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 3a1f0f6952929..5d9f7c10ef84e 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -141,9 +141,9 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - FileHandleFactory* fileHandleFactory_; - const ConnectorQueryCtx* const connectorQueryCtx_; - folly::Executor* executor_; + FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; + folly::Executor* const executor_; }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index f8e5a8f8cffe7..ccb2203dbbabb 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -19,12 +19,13 @@ #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" #include "velox/dwio/common/CachedBufferedInput.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/ReaderFactory.h" #include -#include +// #include #include #include @@ -144,19 +145,34 @@ std::unique_ptr SplitReader::create( std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats) { - return std::make_unique( - hiveSplit, - hiveTableHandle, - scanSpec, - readerOutputType, - partitionKeys, - fileHandleFactory, - executor, - connectorQueryCtx, - ioStats); + // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] + if (hiveSplit->customSplitInfo["table_format"] == "hive_iceberg") { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + + connectorQueryCtx, + executor, + ioStats); + } else { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats); + } } SplitReader::SplitReader( @@ -167,8 +183,8 @@ SplitReader::SplitReader( std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats) : hiveSplit_(hiveSplit), hiveTableHandle_(hiveTableHandle), @@ -176,8 +192,8 @@ SplitReader::SplitReader( readerOutputType_(readerOutputType), partitionKeys_(partitionKeys), fileHandleFactory_(fileHandleFactory), - executor_(executor), connectorQueryCtx_(connectorQueryCtx), + executor_(executor), ioStats_(ioStats), baseReaderOpts_(connectorQueryCtx->memoryPool()) {} diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 52417c6425711..0f7fabd7b0f36 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -51,8 +51,8 @@ class SplitReader { std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats); SplitReader( @@ -63,8 +63,8 @@ class SplitReader { std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats); virtual ~SplitReader() = default; @@ -122,8 +122,8 @@ class SplitReader { std::unique_ptr baseReader_; std::unique_ptr baseRowReader_; FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; folly::Executor* const executor_; - const ConnectorQueryCtx* const connectorQueryCtx_; std::shared_ptr ioStats_; private: diff --git a/velox/connectors/hive/benchmarks/CMakeLists.txt b/velox/connectors/hive/benchmarks/CMakeLists.txt index ddc3e0cf67caf..73d17964ac88c 100644 --- a/velox/connectors/hive/benchmarks/CMakeLists.txt +++ b/velox/connectors/hive/benchmarks/CMakeLists.txt @@ -27,6 +27,7 @@ target_link_libraries( velox_exec velox_exec_test_lib velox_hive_connector + velox_hive_iceberg_splitreader velox_hive_partition_function velox_memory Folly::folly diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt new file mode 100644 index 0000000000000..79993326fd8cb --- /dev/null +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -0,0 +1,37 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# add_library(velox_hive_iceberg_splitreader OBJECT IMPORTED GLOBAL +# IcebergSplitReader.cpp IcebergSplit.cpp DeleteFileReader.cpp) + +add_library(velox_hive_iceberg_splitreader OBJECT + IcebergSplitReader.cpp IcebergSplit.cpp DeleteFileReader.cpp) + +target_link_libraries( + velox_hive_iceberg_splitreader + Folly::folly + gtest + Boost::headers + Boost::filesystem + gflags::gflags + glog::glog + gtest + gtest_main + xsimd) + +# install(TARGETS velox_hive_iceberg_splitreader EXPORT iceberg) message (STATUS +# "Installed velox_hive_iceberg_splitreader") install(EXPORT iceberg FILE +# icebergConfig.cmake NAMESPACE iceberg:: DESTINATION lib/cmake/iceberg) + +add_subdirectory(tests) diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.cpp b/velox/connectors/hive/iceberg/DeleteFileReader.cpp new file mode 100644 index 0000000000000..143f2e73609b5 --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.cpp @@ -0,0 +1,192 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" + +namespace facebook::velox::connector::hive::iceberg { +DeleteFileReader::DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId) + : deleteFile_(deleteFile), + baseFilePath_(baseFilePath), + fileHandleFactory_(fileHandleFactory), + executor_(executor), + connectorQueryCtx_(connectorQueryCtx), + splitOffset_(splitOffset), + deletePositionsVector_(nullptr), + deletePositionsOffset_(-1), + endOfFile_(false) { + if (deleteFile.content == FileContent::kPositionalDeletes) { + createPositionalDeleteDataSource(deleteFile, connectorId); + } else if (deleteFile.content == FileContent::kEqualityDeletes) { + VELOX_NYI("Iceberg equality delete files are not supported yet."); + } else { + VELOX_FAIL("Unrecogonized Iceberg delete file type: ", deleteFile.content); + } +} + +void DeleteFileReader::createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId) { + auto filePathColumn = ICEBERG_DELETE_FILE_PATH_COLUMN(); + auto positionsColumn = ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); + + std::vector deleteColumnNames( + {filePathColumn->name, positionsColumn->name}); + std::vector> deleteColumnTypes( + {filePathColumn->type, positionsColumn->type}); + + RowTypePtr deleteRowType = + ROW(std::move(deleteColumnNames), std::move(deleteColumnTypes)); + + std::unordered_map> + deleteColumnHandles; + deleteColumnHandles[filePathColumn->name] = + std::make_shared( + filePathColumn->name, + HiveColumnHandle::ColumnType::kRegular, + VARCHAR(), + VARCHAR()); + deleteColumnHandles[positionsColumn->name] = + std::make_shared( + positionsColumn->name, + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT()); + + size_t lastPathDelimiterPos = deleteFile.filePath.find_last_of('/'); + std::string deleteFileName = deleteFile.filePath.substr( + lastPathDelimiterPos, deleteFile.filePath.size() - lastPathDelimiterPos); + + // TODO: Build filters on the path column: filePathColumn = baseFilePath_ + // TODO: Build filters on the positionsColumn: + // positionsColumn >= baseReadOffset_ + splitOffsetInFile + SubfieldFilters subfieldFilters = {}; + + auto deleteTableHandle = std::make_shared( + connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr); + + auto deleteSplit = std::make_shared( + connectorId, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + deleteDataSource_ = std::make_unique( + deleteRowType, + deleteTableHandle, + deleteColumnHandles, + fileHandleFactory_, + executor_, + connectorQueryCtx_); + + deleteDataSource_->addSplit(deleteSplit); +} + +void DeleteFileReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap) { + ContinueFuture blockingFuture(ContinueFuture::makeEmpty()); + // We are going to read to the row number up to the end of the batch. For the + // same base file, the deleted rows are in ascending order in the same delete + // file + int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size; + + // Finish unused delete positions from last batch + if (deletePositionsVector_ && + deletePositionsOffset_ < deletePositionsVector_->size()) { + readDeletePositionsVec(baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + + uint64_t numRowsToRead = std::min(size, deleteFile_.recordCount); + while (true) { + std::optional deletesResult = + deleteDataSource_->next(numRowsToRead, blockingFuture); + + if (!deletesResult.has_value()) { + return; + } + + auto data = deletesResult.value(); + if (data) { + if (data->size() > 0) { + VELOX_CHECK(data->childrenSize() > 0); + data->loadedVector(); + + deletePositionsVector_ = data->childAt(1); + deletePositionsOffset_ = 0; + + readDeletePositionsVec( + baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + continue; + } else { + endOfFile_ = true; + return; + } + } +} + +bool DeleteFileReader::endOfFile() { + return endOfFile_; +} + +void DeleteFileReader::readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap) { + // Convert the positions in file into positions relative to the start of the + // split. + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + int64_t offset = baseReadOffset + splitOffset_; + while (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] < rowNumberUpperBound) { + bits::setBit( + deleteBitmap, deletePositions[deletePositionsOffset_] - offset); + deletePositionsOffset_++; + } +} + +bool DeleteFileReader::readFinishedForBatch(int64_t rowNumberUpperBound) { + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + if (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) { + return true; + } + return false; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.h b/velox/connectors/hive/iceberg/DeleteFileReader.h new file mode 100644 index 0000000000000..140b6f3d8889f --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveDataSource.h" + +#include +#include + +namespace facebook::velox::cache { +class AsyncDataCache; +} + +namespace facebook::velox::connector { +class ConnectorQueryCtx; +} + +namespace facebook::velox::core { +class ExpressionEvaluator; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class DeleteFileReader { + public: + DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId); + + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap); + + bool endOfFile(); + + private: + void createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId); + + void readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap); + + bool readFinishedForBatch(int64_t rowNumberUpperBound); + + const IcebergDeleteFile& deleteFile_; + const std::string& baseFilePath_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + ConnectorQueryCtx* const connectorQueryCtx_; + uint64_t splitOffset_; + + std::unique_ptr deleteDataSource_; + VectorPtr deletePositionsVector_; + uint64_t deletePositionsOffset_; + bool endOfFile_; +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h new file mode 100644 index 0000000000000..1cff2b8efbe26 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/dwio/common/Options.h" + +#include +#include +#include + +namespace facebook::velox::connector::hive::iceberg { + +enum class FileContent { + kData, + kPositionalDeletes, + kEqualityDeletes, +}; + +struct IcebergDeleteFile { + FileContent content; + const std::string filePath; + dwio::common::FileFormat fileFormat; + uint64_t recordCount; + uint64_t fileSizeInBytes; + // The field ids for the delete columns for equality delete files + std::vector equalityFieldIds; + // The lower bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], where 10 and 50 are the deleted row positions in + // the data file, then lowerBounds would contain entry <1, "10"> + std::unordered_map lowerBounds; + // The upper bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> + std::unordered_map upperBounds; + + IcebergDeleteFile( + FileContent _content, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _recordCount, + uint64_t _fileSizeInBytes, + std::vector _equalityFieldIds = {}, + std::unordered_map _lowerBounds = {}, + std::unordered_map _upperBounds = {}) + : content(_content), + filePath(_filePath), + fileFormat(_fileFormat), + recordCount(_recordCount), + fileSizeInBytes(_fileSizeInBytes), + equalityFieldIds(_equalityFieldIds), + lowerBounds(_lowerBounds), + upperBounds(_upperBounds) {} +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h new file mode 100644 index 0000000000000..d4442321e53ac --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/type/Type.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +const std::string kIcebergDeleteFilePathColumn = "file_path"; +const std::string kIcebergRowPositionColumn = "pos"; + +struct IcebergMetadataColumn { + int id; + std::string name; + std::shared_ptr type; + std::string doc; + + IcebergMetadataColumn( + int _id, + const std::string& _name, + std::shared_ptr _type, + const std::string& _doc) + : id(_id), name(_name), type(_type), doc(_doc) {} +}; + +#define ICEBERG_DELETE_FILE_PATH_COLUMN() \ + std::make_shared( \ + 2147483546, \ + kIcebergDeleteFilePathColumn, \ + VARCHAR(), \ + "Path of a file in which a deleted row is stored"); + +#define ICEBERG_DELETE_FILE_POSITIONS_COLUMN() \ + std::make_shared( \ + 2147483545, \ + kIcebergRowPositionColumn, \ + BIGINT(), \ + "Ordinal position of a deleted row in the data file"); + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp new file mode 100644 index 0000000000000..fe256b073b175 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplit.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber) { + // TODO: Deserialize _extraFileInfo to get deleteFiles; +} + +// For tests only +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo, + std::vector _deletes) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo), + deleteFiles(_deletes) {} +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h new file mode 100644 index 0000000000000..25f1e6a4e4b16 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { + std::vector deleteFiles; + + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}); + + // For tests only + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}, + std::vector deletes = {}); +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp new file mode 100644 index 0000000000000..5c4cbea344ec3 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/common/BufferUtil.h" + +using namespace facebook::velox::dwio::common; + +namespace facebook::velox::connector::hive::iceberg { + +IcebergSplitReader::IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats) + : SplitReader( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats) {} + +void IcebergSplitReader::prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) { + SplitReader::prepareSplit(metadataFilter, runtimeStats); + baseReadOffset_ = 0; + deleteFileReaders_.clear(); + splitOffset_ = baseRowReader_->nextRowNumber(); + + // TODO: Deserialize the std::vector deleteFiles. For now + // we assume it's already deserialized. + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(hiveSplit_); + + const auto& deleteFiles = icebergSplit->deleteFiles; + for (const auto& deleteFile : deleteFiles) { + deleteFileReaders_.push_back(std::make_unique( + deleteFile, + hiveSplit_->filePath, + fileHandleFactory_, + executor_, + connectorQueryCtx_, + splitOffset_, + hiveSplit_->connectorId)); + } +} + +uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) { + Mutation mutation; + mutation.deletedRows = nullptr; + + if (!deleteFileReaders_.empty()) { + auto numBytes = bits::nbytes(size); + dwio::common::ensureCapacity( + deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool()); + std::memset((void*)deleteBitmap_->as(), 0L, numBytes); + + for (auto iter = deleteFileReaders_.begin(); + iter != deleteFileReaders_.end(); + iter++) { + (*iter)->readDeletePositions( + baseReadOffset_, size, deleteBitmap_->asMutable()); + if ((*iter)->endOfFile()) { + iter = deleteFileReaders_.erase(iter); + } + } + + deleteBitmap_->setSize(numBytes); + mutation.deletedRows = deleteBitmap_->as(); + } + + auto rowsScanned = baseRowReader_->next(size, output, &mutation); + baseReadOffset_ += rowsScanned; + + return rowsScanned; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h new file mode 100644 index 0000000000000..520ff5e9aa4be --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +namespace facebook::velox::connector { +class ColumnHandle; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class IcebergSplitReader : public SplitReader { + public: + IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats); + + ~IcebergSplitReader() override = default; + + void prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) override; + + uint64_t next(int64_t size, VectorPtr& output) override; + + private: + // The read offset to the beginning of the split in number of rows for the + // current batch for the base data file + uint64_t baseReadOffset_; + // The file position for the first row in the split + uint64_t splitOffset_; + std::list> deleteFileReaders_; + BufferPtr deleteBitmap_; +}; +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt new file mode 100644 index 0000000000000..b834346ed37bd --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -0,0 +1,28 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +add_executable(velox_hive_iceberg_test IcebergReadTest.cpp) +add_test(velox_hive_iceberg_test velox_hive_iceberg_test) + +target_link_libraries( + velox_hive_iceberg_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_hive_partition_function + velox_dwio_common_exception + velox_dwio_common_test_utils + velox_vector_test_lib + velox_exec + velox_exec_test_lib + gtest + gtest_main) diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp new file mode 100644 index 0000000000000..4fd80befb31d9 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -0,0 +1,208 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox::exec::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::test; + +namespace facebook::velox::connector::hive::iceberg { + +class HiveIcebergTest : public HiveConnectorTestBase { + public: + void assertPositionalDeletes(const std::vector& deleteRows) { + assertPositionalDeletes( + deleteRows, + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + + ")"); + } + void assertPositionalDeletes( + const std::vector& deleteRows, + std::string duckdbSql) { + std::shared_ptr dataFilePath = writeDataFile(rowCount); + std::shared_ptr deleteFilePath = + writePositionDeleteFile(dataFilePath->path, deleteRows); + + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->path, + fileFomat_, + deleteRows.size(), + testing::internal::GetFileSize( + std::fopen(deleteFilePath->path.c_str(), "r"))); + + auto icebergSplit = makeIcebergSplit(dataFilePath->path, {deleteFile}); + + auto plan = tableScanNode(); + auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + + std::vector makeRandomDeleteRows(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + + std::vector makeSequenceRows(int32_t maxRowNumber) { + std::vector deleteRows; + deleteRows.resize(maxRowNumber); + std::iota(deleteRows.begin(), deleteRows.end(), 0); + return deleteRows; + } + + const static int rowCount = 20000; + + private: + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + std::unordered_map> partitionKeys; + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive_iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } + + std::vector makeVectors(int32_t count, int32_t rowsPerVector) { + std::vector vectors; + + for (int i = 0; i < count; i++) { + auto data = makeSequenceRows(rowsPerVector); + VectorPtr c0 = vectorMaker_.flatVector(data); + vectors.push_back(makeRowVector({"c0"}, {c0})); + } + + return vectors; + } + + std::shared_ptr writeDataFile(uint64_t numRows) { + auto dataVectors = makeVectors(1, numRows); + + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->path, dataVectors); + createDuckDbTable(dataVectors); + return dataFilePath; + } + + std::shared_ptr writePositionDeleteFile( + const std::string& dataFilePath, + const std::vector& deleteRows) { + uint32_t numDeleteRows = deleteRows.size(); + + auto child = vectorMaker_.flatVector(std::vector{1UL}); + + auto filePathVector = vectorMaker_.flatVector( + numDeleteRows, [&](auto row) { return StringView(dataFilePath); }); + auto deletePositionsVector = vectorMaker_.flatVector(deleteRows); + RowVectorPtr deleteFileVectors = makeRowVector( + {pathColumn_->name, posColumn_->name}, + {filePathVector, deletePositionsVector}); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->path, deleteFileVectors); + + return deleteFilePath; + } + + std::string makeNotInList(const std::vector& deleteRows) { + if (deleteRows.empty()) { + return ""; + } + + return std::accumulate( + deleteRows.begin() + 1, + deleteRows.end(), + std::to_string(deleteRows[0]), + [](const std::string& a, int64_t b) { + return a + ", " + std::to_string(b); + }); + } + + std::shared_ptr assertQuery( + const core::PlanNodePtr& plan, + std::shared_ptr dataFilePath, + const std::vector& deleteFiles, + const std::string& duckDbSql) { + auto icebergSplit = makeIcebergSplit(dataFilePath->path, deleteFiles); + return OperatorTestBase::assertQuery(plan, {icebergSplit}, duckDbSql); + } + + core::PlanNodePtr tableScanNode() { + return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + } + + private: + dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; + RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; + std::shared_ptr pathColumn_ = + ICEBERG_DELETE_FILE_PATH_COLUMN(); + std::shared_ptr posColumn_ = + ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); +}; + +TEST_F(HiveIcebergTest, positionalDeletes) { + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertPositionalDeletes({0, 1, 2, 3}); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount)); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp"); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0"); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}); +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt b/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt index 29f9342874604..7e09e65a358da 100644 --- a/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt @@ -20,5 +20,6 @@ target_link_libraries( velox_gcs velox_core velox_hive_connector + velox_hive_iceberg_splitreader velox_dwio_common_exception velox_exec) diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt index b44aa355c09fc..22d08f586e665 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries( velox_gcs velox_core velox_hive_connector + velox_hive_iceberg_splitreader velox_dwio_common_exception velox_exec gmock diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt index f0bb039ac7137..579ae3495c303 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt @@ -24,6 +24,7 @@ target_link_libraries( velox_core velox_exec_test_lib velox_hive_connector + velox_hive_iceberg_splitreader velox_dwio_common_exception velox_exec gtest diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index f3b348ca1767b..056e4fe2a83c6 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -26,6 +26,7 @@ add_test(velox_hive_connector_test velox_hive_connector_test) target_link_libraries( velox_hive_connector_test velox_hive_connector + velox_hive_iceberg_splitreader velox_hive_partition_function velox_dwio_common_exception velox_vector_fuzzer diff --git a/velox/dwio/dwrf/test/CMakeLists.txt b/velox/dwio/dwrf/test/CMakeLists.txt index 48341173e2165..99107b190684d 100644 --- a/velox/dwio/dwrf/test/CMakeLists.txt +++ b/velox/dwio/dwrf/test/CMakeLists.txt @@ -347,6 +347,8 @@ add_test(velox_dwrf_e2e_filter_test velox_dwrf_e2e_filter_test) target_link_libraries( velox_dwrf_e2e_filter_test velox_e2e_filter_test_base + velox_hive_connector + velox_hive_iceberg_splitreader velox_link_libs Folly::folly fmt::fmt diff --git a/velox/dwio/parquet/tests/CMakeLists.txt b/velox/dwio/parquet/tests/CMakeLists.txt index bc595b3c5baa1..e7aae92485850 100644 --- a/velox/dwio/parquet/tests/CMakeLists.txt +++ b/velox/dwio/parquet/tests/CMakeLists.txt @@ -37,6 +37,7 @@ target_link_libraries( velox_exec_test_lib velox_exec velox_hive_connector + velox_hive_iceberg_splitreader velox_tpch_connector velox_aggregates velox_tpch_gen diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index ff8aa8a9c27e4..8e27e59827e7a 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -51,6 +51,7 @@ target_link_libraries( velox_exec_test_lib velox_exec velox_hive_connector + # velox_hive_iceberg_splitreader Folly::folly ${FOLLY_BENCHMARK}) @@ -81,6 +82,7 @@ target_link_libraries( velox_exec_test_lib velox_exec velox_hive_connector + velox_hive_iceberg_splitreader velox_link_libs ${TEST_LINK_LIBS}) diff --git a/velox/examples/CMakeLists.txt b/velox/examples/CMakeLists.txt index 8625d34ce82ee..2fd6de3954291 100644 --- a/velox/examples/CMakeLists.txt +++ b/velox/examples/CMakeLists.txt @@ -45,6 +45,7 @@ target_link_libraries( velox_exec velox_exec_test_lib velox_hive_connector + velox_hive_iceberg_splitreader velox_memory) add_executable(velox_example_vector_reader_writer VectorReaderWriter.cpp) diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index e9287a7a494f9..1f398a41081e6 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -122,6 +122,7 @@ target_link_libraries( velox_functions_prestosql velox_functions_test_lib velox_hive_connector + velox_hive_iceberg_splitreader velox_memory velox_serialization velox_test_util @@ -156,6 +157,7 @@ target_link_libraries( velox_functions_prestosql velox_functions_test_lib velox_hive_connector + velox_hive_iceberg_splitreader velox_memory velox_serialization velox_test_util @@ -199,6 +201,7 @@ target_link_libraries( velox_aggregation_fuzzer_test velox_aggregation_fuzzer velox_aggregates + velox_hive_iceberg_splitreader velox_window velox_vector_test_lib gtest diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index bcdb2a0303c5d..2943bdd85ac48 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -45,6 +45,7 @@ target_link_libraries( velox_dwio_common_test_utils velox_type_fbhive velox_hive_connector + velox_hive_iceberg_splitreader velox_tpch_connector velox_presto_serializer velox_functions_prestosql diff --git a/velox/experimental/codegen/benchmark/CMakeLists.txt b/velox/experimental/codegen/benchmark/CMakeLists.txt index 8541b47383ba5..2260b47974f3a 100644 --- a/velox/experimental/codegen/benchmark/CMakeLists.txt +++ b/velox/experimental/codegen/benchmark/CMakeLists.txt @@ -23,6 +23,7 @@ target_link_libraries( velox_codegen_utils_resource_path velox_codegen_code_generator velox_hive_connector + velox_hive_iceberg_splitreader ${FOLLY_BENCHMARK} Folly::folly Boost::filesystem @@ -41,7 +42,8 @@ add_test(velox_codegen_benchmark_single_output velox_codegen_benchmark_single_output) add_executable(velox_codegen_benchmark CodegenBenchmarks.cpp) -target_link_libraries(velox_codegen_benchmark velox_hive_connector) +target_link_libraries(velox_codegen_benchmark velox_hive_connector + velox_hive_iceberg_splitreader) add_compile_definitions(velox_codegen_test BASEFOLDER="${PROJECT_SOURCE_DIR}") add_compile_definitions(velox_codegen_test COMPFLAGS="${CMAKE_CXX_FLAGS}") diff --git a/velox/experimental/codegen/tests/CMakeLists.txt b/velox/experimental/codegen/tests/CMakeLists.txt index b3f91ca078d4a..82c354a93ba6e 100644 --- a/velox/experimental/codegen/tests/CMakeLists.txt +++ b/velox/experimental/codegen/tests/CMakeLists.txt @@ -24,6 +24,7 @@ target_link_libraries( velox_exec_test_lib velox_functions_prestosql velox_hive_connector + velox_hive_iceberg_splitreader velox_codegen_utils_resource_path) add_test(velox_experimental_codegen_test velox_experimental_codegen_test) diff --git a/velox/experimental/codegen/vector_function/tests/CMakeLists.txt b/velox/experimental/codegen/vector_function/tests/CMakeLists.txt index 7f75bec3e314d..6202a23245ca3 100644 --- a/velox/experimental/codegen/vector_function/tests/CMakeLists.txt +++ b/velox/experimental/codegen/vector_function/tests/CMakeLists.txt @@ -29,6 +29,7 @@ target_link_libraries( velox_functions_lib velox_functions_prestosql velox_hive_connector + velox_hive_iceberg_splitreader velox_core velox_type velox_serialization diff --git a/velox/experimental/wave/exec/tests/CMakeLists.txt b/velox/experimental/wave/exec/tests/CMakeLists.txt index 74feddf6cf066..a0d77114e988d 100644 --- a/velox/experimental/wave/exec/tests/CMakeLists.txt +++ b/velox/experimental/wave/exec/tests/CMakeLists.txt @@ -34,6 +34,7 @@ target_link_libraries( velox_functions_prestosql velox_functions_test_lib velox_hive_connector + velox_hive_iceberg_splitreader velox_memory velox_serialization velox_test_util diff --git a/velox/expression/tests/CMakeLists.txt b/velox/expression/tests/CMakeLists.txt index 7d2c68de03d4a..d8d921c9ad841 100644 --- a/velox/expression/tests/CMakeLists.txt +++ b/velox/expression/tests/CMakeLists.txt @@ -64,6 +64,7 @@ target_link_libraries( velox_expression_test velox_aggregates velox_hive_connector + velox_hive_iceberg_splitreader velox_dwio_common velox_dwio_common_exception velox_exec_test_lib @@ -119,8 +120,13 @@ target_link_libraries(velox_expression_runner velox_expression_verifier add_executable(velox_expression_runner_test ExpressionRunnerTest.cpp) target_link_libraries( - velox_expression_runner_test velox_expression_runner velox_exec_test_lib - velox_function_registry gtest gtest_main) + velox_expression_runner_test + velox_expression_runner + velox_exec_test_lib + velox_hive_iceberg_splitreader + velox_function_registry + gtest + gtest_main) add_executable(velox_expression_verifier_unit_test ExpressionVerifierUnitTest.cpp) diff --git a/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt b/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt index 92b0ec8d9663d..53a54d9f7b97a 100644 --- a/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt +++ b/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries( velox_aggregates_simple_aggregates_bm velox_aggregates velox_hive_connector + velox_hive_iceberg_splitreader velox_functions_lib velox_exec_test_lib velox_functions_prestosql @@ -33,6 +34,7 @@ target_link_libraries( velox_aggregates_reduce_agg_bm velox_aggregates velox_hive_connector + velox_hive_iceberg_splitreader velox_functions_lib velox_exec_test_lib velox_functions_prestosql diff --git a/velox/functions/prestosql/aggregates/tests/CMakeLists.txt b/velox/functions/prestosql/aggregates/tests/CMakeLists.txt index ae3d6764c4258..44618acad8220 100644 --- a/velox/functions/prestosql/aggregates/tests/CMakeLists.txt +++ b/velox/functions/prestosql/aggregates/tests/CMakeLists.txt @@ -69,6 +69,7 @@ target_link_libraries( velox_functions_prestosql velox_functions_lib velox_hive_connector + velox_hive_iceberg_splitreader velox_simple_aggregate velox_type velox_vector_fuzzer diff --git a/velox/functions/sparksql/aggregates/tests/CMakeLists.txt b/velox/functions/sparksql/aggregates/tests/CMakeLists.txt index 806d539c5610f..82765e8196bbc 100644 --- a/velox/functions/sparksql/aggregates/tests/CMakeLists.txt +++ b/velox/functions/sparksql/aggregates/tests/CMakeLists.txt @@ -32,6 +32,7 @@ target_link_libraries( velox_functions_aggregates_test_lib velox_functions_spark_aggregates velox_hive_connector + velox_hive_iceberg_splitreader gflags::gflags gtest gtest_main) diff --git a/velox/substrait/tests/CMakeLists.txt b/velox/substrait/tests/CMakeLists.txt index f453fb61b7c43..c785c2b6a99c7 100644 --- a/velox/substrait/tests/CMakeLists.txt +++ b/velox/substrait/tests/CMakeLists.txt @@ -41,6 +41,7 @@ target_link_libraries( velox_functions_lib velox_functions_prestosql velox_hive_connector + velox_hive_iceberg_splitreader velox_type velox_serialization velox_exec_test_lib