diff --git a/velox/common/testutil/tests/CMakeLists.txt b/velox/common/testutil/tests/CMakeLists.txt index 02b7ee5f1045..d232c201d931 100644 --- a/velox/common/testutil/tests/CMakeLists.txt +++ b/velox/common/testutil/tests/CMakeLists.txt @@ -16,11 +16,5 @@ add_executable(velox_test_util_test TestValueTest.cpp SpillConfigTest.cpp) gtest_add_tests(velox_test_util_test "" AUTO) target_link_libraries( - velox_test_util_test - PRIVATE - velox_test_util - velox_exception - velox_spill_config - velox_exec - gtest - gtest_main) + velox_test_util_test PRIVATE velox_test_util velox_exception + velox_spill_config velox_exec gtest gtest_main) diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 5fbd19245169..772aac7d7506 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -13,9 +13,10 @@ # limitations under the License. add_library(velox_hive_config OBJECT HiveConfig.cpp) - target_link_libraries(velox_hive_config velox_exception) +add_subdirectory(iceberg) + add_library( velox_hive_connector OBJECT FileHandle.cpp @@ -30,18 +31,19 @@ add_library( target_link_libraries( velox_hive_connector - velox_common_io - velox_connector - velox_dwio_catalog_fbhive - velox_dwio_dwrf_reader - velox_dwio_dwrf_writer - velox_dwio_parquet_reader - velox_dwio_parquet_writer - velox_file - velox_hive_partition_function - velox_s3fs - velox_hdfs - velox_gcs) + PUBLIC velox_hive_iceberg_splitreader + PRIVATE velox_common_io + velox_connector + velox_dwio_catalog_fbhive + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_file + velox_hive_partition_function + velox_s3fs + velox_hdfs + velox_gcs) add_library(velox_hive_partition_function HivePartitionFunction.cpp) diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 15edd9d2ac2f..9482051fcb3b 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -25,15 +25,15 @@ #pragma once -#include -#include -#include - #include "velox/common/caching/CachedFactory.h" #include "velox/common/caching/FileIds.h" #include "velox/common/file/File.h" #include "velox/dwio/common/InputStream.h" +//#include +//#include +//#include + namespace facebook::velox { class Config; diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 184d40cd981a..be753d2f387d 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -75,7 +75,6 @@ std::unique_ptr HiveConnector::createDataSource( std::string, std::shared_ptr>& columnHandles, ConnectorQueryCtx* connectorQueryCtx) { - return std::make_unique( outputType, tableHandle, diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 16bd3a503ede..19b7b49b7aaa 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -480,8 +480,8 @@ std::unique_ptr HiveDataSource::createSplitReader() { readerOutputType_, &partitionKeys_, fileHandleFactory_, - executor_, connectorQueryCtx_, + executor_, ioStats_); } @@ -497,7 +497,6 @@ void HiveDataSource::addSplit(std::shared_ptr split) { if (splitReader_) { splitReader_.reset(); } - splitReader_ = createSplitReader(); splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 2d54db3fda64..8e448e18a35f 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -143,9 +143,9 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - FileHandleFactory* fileHandleFactory_; - const ConnectorQueryCtx* const connectorQueryCtx_; - folly::Executor* executor_; + FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; + folly::Executor* const executor_; }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index f8e5a8f8cffe..ccb2203dbbab 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -19,12 +19,13 @@ #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" #include "velox/dwio/common/CachedBufferedInput.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/ReaderFactory.h" #include -#include +// #include #include #include @@ -144,19 +145,34 @@ std::unique_ptr SplitReader::create( std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats) { - return std::make_unique( - hiveSplit, - hiveTableHandle, - scanSpec, - readerOutputType, - partitionKeys, - fileHandleFactory, - executor, - connectorQueryCtx, - ioStats); + // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] + if (hiveSplit->customSplitInfo["table_format"] == "hive_iceberg") { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + + connectorQueryCtx, + executor, + ioStats); + } else { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats); + } } SplitReader::SplitReader( @@ -167,8 +183,8 @@ SplitReader::SplitReader( std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats) : hiveSplit_(hiveSplit), hiveTableHandle_(hiveTableHandle), @@ -176,8 +192,8 @@ SplitReader::SplitReader( readerOutputType_(readerOutputType), partitionKeys_(partitionKeys), fileHandleFactory_(fileHandleFactory), - executor_(executor), connectorQueryCtx_(connectorQueryCtx), + executor_(executor), ioStats_(ioStats), baseReaderOpts_(connectorQueryCtx->memoryPool()) {} diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 52417c642571..0f7fabd7b0f3 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -51,8 +51,8 @@ class SplitReader { std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats); SplitReader( @@ -63,8 +63,8 @@ class SplitReader { std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats); virtual ~SplitReader() = default; @@ -122,8 +122,8 @@ class SplitReader { std::unique_ptr baseReader_; std::unique_ptr baseRowReader_; FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; folly::Executor* const executor_; - const ConnectorQueryCtx* const connectorQueryCtx_; std::shared_ptr ioStats_; private: diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt new file mode 100644 index 000000000000..d6f9148680c5 --- /dev/null +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_hive_iceberg_splitreader + IcebergSplitReader.cpp IcebergSplit.cpp DeleteFileReader.cpp) + +target_link_libraries( + velox_hive_iceberg_splitreader + Folly::folly + gflags::gflags + glog::glog + gtest + gtest_main + xsimd) + +add_subdirectory(tests) diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.cpp b/velox/connectors/hive/iceberg/DeleteFileReader.cpp new file mode 100644 index 000000000000..143f2e73609b --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.cpp @@ -0,0 +1,192 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" + +namespace facebook::velox::connector::hive::iceberg { +DeleteFileReader::DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId) + : deleteFile_(deleteFile), + baseFilePath_(baseFilePath), + fileHandleFactory_(fileHandleFactory), + executor_(executor), + connectorQueryCtx_(connectorQueryCtx), + splitOffset_(splitOffset), + deletePositionsVector_(nullptr), + deletePositionsOffset_(-1), + endOfFile_(false) { + if (deleteFile.content == FileContent::kPositionalDeletes) { + createPositionalDeleteDataSource(deleteFile, connectorId); + } else if (deleteFile.content == FileContent::kEqualityDeletes) { + VELOX_NYI("Iceberg equality delete files are not supported yet."); + } else { + VELOX_FAIL("Unrecogonized Iceberg delete file type: ", deleteFile.content); + } +} + +void DeleteFileReader::createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId) { + auto filePathColumn = ICEBERG_DELETE_FILE_PATH_COLUMN(); + auto positionsColumn = ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); + + std::vector deleteColumnNames( + {filePathColumn->name, positionsColumn->name}); + std::vector> deleteColumnTypes( + {filePathColumn->type, positionsColumn->type}); + + RowTypePtr deleteRowType = + ROW(std::move(deleteColumnNames), std::move(deleteColumnTypes)); + + std::unordered_map> + deleteColumnHandles; + deleteColumnHandles[filePathColumn->name] = + std::make_shared( + filePathColumn->name, + HiveColumnHandle::ColumnType::kRegular, + VARCHAR(), + VARCHAR()); + deleteColumnHandles[positionsColumn->name] = + std::make_shared( + positionsColumn->name, + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT()); + + size_t lastPathDelimiterPos = deleteFile.filePath.find_last_of('/'); + std::string deleteFileName = deleteFile.filePath.substr( + lastPathDelimiterPos, deleteFile.filePath.size() - lastPathDelimiterPos); + + // TODO: Build filters on the path column: filePathColumn = baseFilePath_ + // TODO: Build filters on the positionsColumn: + // positionsColumn >= baseReadOffset_ + splitOffsetInFile + SubfieldFilters subfieldFilters = {}; + + auto deleteTableHandle = std::make_shared( + connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr); + + auto deleteSplit = std::make_shared( + connectorId, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + deleteDataSource_ = std::make_unique( + deleteRowType, + deleteTableHandle, + deleteColumnHandles, + fileHandleFactory_, + executor_, + connectorQueryCtx_); + + deleteDataSource_->addSplit(deleteSplit); +} + +void DeleteFileReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap) { + ContinueFuture blockingFuture(ContinueFuture::makeEmpty()); + // We are going to read to the row number up to the end of the batch. For the + // same base file, the deleted rows are in ascending order in the same delete + // file + int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size; + + // Finish unused delete positions from last batch + if (deletePositionsVector_ && + deletePositionsOffset_ < deletePositionsVector_->size()) { + readDeletePositionsVec(baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + + uint64_t numRowsToRead = std::min(size, deleteFile_.recordCount); + while (true) { + std::optional deletesResult = + deleteDataSource_->next(numRowsToRead, blockingFuture); + + if (!deletesResult.has_value()) { + return; + } + + auto data = deletesResult.value(); + if (data) { + if (data->size() > 0) { + VELOX_CHECK(data->childrenSize() > 0); + data->loadedVector(); + + deletePositionsVector_ = data->childAt(1); + deletePositionsOffset_ = 0; + + readDeletePositionsVec( + baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + continue; + } else { + endOfFile_ = true; + return; + } + } +} + +bool DeleteFileReader::endOfFile() { + return endOfFile_; +} + +void DeleteFileReader::readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap) { + // Convert the positions in file into positions relative to the start of the + // split. + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + int64_t offset = baseReadOffset + splitOffset_; + while (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] < rowNumberUpperBound) { + bits::setBit( + deleteBitmap, deletePositions[deletePositionsOffset_] - offset); + deletePositionsOffset_++; + } +} + +bool DeleteFileReader::readFinishedForBatch(int64_t rowNumberUpperBound) { + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + if (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) { + return true; + } + return false; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.h b/velox/connectors/hive/iceberg/DeleteFileReader.h new file mode 100644 index 000000000000..140b6f3d8889 --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveDataSource.h" + +#include +#include + +namespace facebook::velox::cache { +class AsyncDataCache; +} + +namespace facebook::velox::connector { +class ConnectorQueryCtx; +} + +namespace facebook::velox::core { +class ExpressionEvaluator; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class DeleteFileReader { + public: + DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId); + + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap); + + bool endOfFile(); + + private: + void createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId); + + void readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap); + + bool readFinishedForBatch(int64_t rowNumberUpperBound); + + const IcebergDeleteFile& deleteFile_; + const std::string& baseFilePath_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + ConnectorQueryCtx* const connectorQueryCtx_; + uint64_t splitOffset_; + + std::unique_ptr deleteDataSource_; + VectorPtr deletePositionsVector_; + uint64_t deletePositionsOffset_; + bool endOfFile_; +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h new file mode 100644 index 000000000000..1cff2b8efbe2 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/dwio/common/Options.h" + +#include +#include +#include + +namespace facebook::velox::connector::hive::iceberg { + +enum class FileContent { + kData, + kPositionalDeletes, + kEqualityDeletes, +}; + +struct IcebergDeleteFile { + FileContent content; + const std::string filePath; + dwio::common::FileFormat fileFormat; + uint64_t recordCount; + uint64_t fileSizeInBytes; + // The field ids for the delete columns for equality delete files + std::vector equalityFieldIds; + // The lower bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], where 10 and 50 are the deleted row positions in + // the data file, then lowerBounds would contain entry <1, "10"> + std::unordered_map lowerBounds; + // The upper bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> + std::unordered_map upperBounds; + + IcebergDeleteFile( + FileContent _content, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _recordCount, + uint64_t _fileSizeInBytes, + std::vector _equalityFieldIds = {}, + std::unordered_map _lowerBounds = {}, + std::unordered_map _upperBounds = {}) + : content(_content), + filePath(_filePath), + fileFormat(_fileFormat), + recordCount(_recordCount), + fileSizeInBytes(_fileSizeInBytes), + equalityFieldIds(_equalityFieldIds), + lowerBounds(_lowerBounds), + upperBounds(_upperBounds) {} +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h new file mode 100644 index 000000000000..d4442321e53a --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/type/Type.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +const std::string kIcebergDeleteFilePathColumn = "file_path"; +const std::string kIcebergRowPositionColumn = "pos"; + +struct IcebergMetadataColumn { + int id; + std::string name; + std::shared_ptr type; + std::string doc; + + IcebergMetadataColumn( + int _id, + const std::string& _name, + std::shared_ptr _type, + const std::string& _doc) + : id(_id), name(_name), type(_type), doc(_doc) {} +}; + +#define ICEBERG_DELETE_FILE_PATH_COLUMN() \ + std::make_shared( \ + 2147483546, \ + kIcebergDeleteFilePathColumn, \ + VARCHAR(), \ + "Path of a file in which a deleted row is stored"); + +#define ICEBERG_DELETE_FILE_POSITIONS_COLUMN() \ + std::make_shared( \ + 2147483545, \ + kIcebergRowPositionColumn, \ + BIGINT(), \ + "Ordinal position of a deleted row in the data file"); + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp new file mode 100644 index 000000000000..fe256b073b17 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplit.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber) { + // TODO: Deserialize _extraFileInfo to get deleteFiles; +} + +// For tests only +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo, + std::vector _deletes) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo), + deleteFiles(_deletes) {} +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h new file mode 100644 index 000000000000..25f1e6a4e4b1 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { + std::vector deleteFiles; + + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}); + + // For tests only + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}, + std::vector deletes = {}); +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp new file mode 100644 index 000000000000..5c4cbea344ec --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/common/BufferUtil.h" + +using namespace facebook::velox::dwio::common; + +namespace facebook::velox::connector::hive::iceberg { + +IcebergSplitReader::IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats) + : SplitReader( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats) {} + +void IcebergSplitReader::prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) { + SplitReader::prepareSplit(metadataFilter, runtimeStats); + baseReadOffset_ = 0; + deleteFileReaders_.clear(); + splitOffset_ = baseRowReader_->nextRowNumber(); + + // TODO: Deserialize the std::vector deleteFiles. For now + // we assume it's already deserialized. + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(hiveSplit_); + + const auto& deleteFiles = icebergSplit->deleteFiles; + for (const auto& deleteFile : deleteFiles) { + deleteFileReaders_.push_back(std::make_unique( + deleteFile, + hiveSplit_->filePath, + fileHandleFactory_, + executor_, + connectorQueryCtx_, + splitOffset_, + hiveSplit_->connectorId)); + } +} + +uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) { + Mutation mutation; + mutation.deletedRows = nullptr; + + if (!deleteFileReaders_.empty()) { + auto numBytes = bits::nbytes(size); + dwio::common::ensureCapacity( + deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool()); + std::memset((void*)deleteBitmap_->as(), 0L, numBytes); + + for (auto iter = deleteFileReaders_.begin(); + iter != deleteFileReaders_.end(); + iter++) { + (*iter)->readDeletePositions( + baseReadOffset_, size, deleteBitmap_->asMutable()); + if ((*iter)->endOfFile()) { + iter = deleteFileReaders_.erase(iter); + } + } + + deleteBitmap_->setSize(numBytes); + mutation.deletedRows = deleteBitmap_->as(); + } + + auto rowsScanned = baseRowReader_->next(size, output, &mutation); + baseReadOffset_ += rowsScanned; + + return rowsScanned; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h new file mode 100644 index 000000000000..520ff5e9aa4b --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +namespace facebook::velox::connector { +class ColumnHandle; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class IcebergSplitReader : public SplitReader { + public: + IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats); + + ~IcebergSplitReader() override = default; + + void prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) override; + + uint64_t next(int64_t size, VectorPtr& output) override; + + private: + // The read offset to the beginning of the split in number of rows for the + // current batch for the base data file + uint64_t baseReadOffset_; + // The file position for the first row in the split + uint64_t splitOffset_; + std::list> deleteFileReaders_; + BufferPtr deleteBitmap_; +}; +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/cmake/Config.cmake.in b/velox/connectors/hive/iceberg/cmake/Config.cmake.in new file mode 100644 index 000000000000..0bf51ecc28f5 --- /dev/null +++ b/velox/connectors/hive/iceberg/cmake/Config.cmake.in @@ -0,0 +1,4 @@ +@PACKAGE_INIT@ + +include("${CMAKE_CURRENT_LIST_DIR}/iceberg.cmake") +check_required_components("@PROJECT_NAME@") diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt new file mode 100644 index 000000000000..63603c724ec2 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +if(NOT VELOX_DISABLE_GOOGLETEST) + + add_executable(velox_hive_iceberg_test IcebergReadTest.cpp) + add_test(velox_hive_iceberg_test velox_hive_iceberg_test) + + target_link_libraries( + velox_hive_iceberg_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_hive_partition_function + velox_dwio_common_exception + velox_dwio_common_test_utils + velox_dwio_dwrf_proto + velox_vector_test_lib + velox_exec + velox_exec_test_lib + Folly::folly + gtest + gtest_main) + +endif() diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp new file mode 100644 index 000000000000..6b599861312a --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -0,0 +1,212 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +#include + +using namespace facebook::velox::exec::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::test; + +namespace facebook::velox::connector::hive::iceberg { + +class HiveIcebergTest : public HiveConnectorTestBase { + public: + void assertPositionalDeletes(const std::vector& deleteRows) { + assertPositionalDeletes( + deleteRows, + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + + ")"); + } + void assertPositionalDeletes( + const std::vector& deleteRows, + std::string duckdbSql) { + std::shared_ptr dataFilePath = writeDataFile(rowCount); + std::shared_ptr deleteFilePath = + writePositionDeleteFile(dataFilePath->path, deleteRows); + + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->path, + fileFomat_, + deleteRows.size(), + testing::internal::GetFileSize( + std::fopen(deleteFilePath->path.c_str(), "r"))); + + auto icebergSplit = makeIcebergSplit(dataFilePath->path, {deleteFile}); + + auto plan = tableScanNode(); + auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + + std::vector makeRandomDeleteRows(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + + std::vector makeSequenceRows(int32_t maxRowNumber) { + std::vector deleteRows; + deleteRows.resize(maxRowNumber); + std::iota(deleteRows.begin(), deleteRows.end(), 0); + return deleteRows; + } + + const static int rowCount = 20000; + + private: + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + std::unordered_map> partitionKeys; + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive_iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } + + std::vector makeVectors(int32_t count, int32_t rowsPerVector) { + std::vector vectors; + + for (int i = 0; i < count; i++) { + auto data = makeSequenceRows(rowsPerVector); + VectorPtr c0 = vectorMaker_.flatVector(data); + vectors.push_back(makeRowVector({"c0"}, {c0})); + } + + return vectors; + } + + std::shared_ptr writeDataFile(uint64_t numRows) { + auto dataVectors = makeVectors(1, numRows); + + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->path, dataVectors); + createDuckDbTable(dataVectors); + return dataFilePath; + } + + std::shared_ptr writePositionDeleteFile( + const std::string& dataFilePath, + const std::vector& deleteRows) { + uint32_t numDeleteRows = deleteRows.size(); + + auto child = vectorMaker_.flatVector(std::vector{1UL}); + + auto filePathVector = vectorMaker_.flatVector( + numDeleteRows, [&](auto row) { return StringView(dataFilePath); }); + auto deletePositionsVector = vectorMaker_.flatVector(deleteRows); + RowVectorPtr deleteFileVectors = makeRowVector( + {pathColumn_->name, posColumn_->name}, + {filePathVector, deletePositionsVector}); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->path, deleteFileVectors); + + return deleteFilePath; + } + + std::string makeNotInList(const std::vector& deleteRows) { + if (deleteRows.empty()) { + return ""; + } + + return std::accumulate( + deleteRows.begin() + 1, + deleteRows.end(), + std::to_string(deleteRows[0]), + [](const std::string& a, int64_t b) { + return a + ", " + std::to_string(b); + }); + } + + std::shared_ptr assertQuery( + const core::PlanNodePtr& plan, + std::shared_ptr dataFilePath, + const std::vector& deleteFiles, + const std::string& duckDbSql) { + auto icebergSplit = makeIcebergSplit(dataFilePath->path, deleteFiles); + return OperatorTestBase::assertQuery(plan, {icebergSplit}, duckDbSql); + } + + core::PlanNodePtr tableScanNode() { + return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + } + + private: + dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; + RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; + std::shared_ptr pathColumn_ = + ICEBERG_DELETE_FILE_PATH_COLUMN(); + std::shared_ptr posColumn_ = + ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); +}; + +TEST_F(HiveIcebergTest, positionalDeletes) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertPositionalDeletes({0, 1, 2, 3}); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount)); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp"); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0"); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}); +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/dwio/dwrf/test/CMakeLists.txt b/velox/dwio/dwrf/test/CMakeLists.txt index f99975fb4aab..936e847ec367 100644 --- a/velox/dwio/dwrf/test/CMakeLists.txt +++ b/velox/dwio/dwrf/test/CMakeLists.txt @@ -347,6 +347,7 @@ add_test(velox_dwrf_e2e_filter_test velox_dwrf_e2e_filter_test) target_link_libraries( velox_dwrf_e2e_filter_test velox_e2e_filter_test_base + velox_hive_connector velox_link_libs velox_test_util Folly::folly diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index ff8aa8a9c27e..c94ab590f638 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -43,6 +43,7 @@ target_link_libraries( ZLIB::ZLIB ${TEST_LINK_LIBS}) +message(STATUS "velox_hive_connector_libs ${velox_hive_connector_libs}") add_executable(velox_dwio_parquet_reader_benchmark ParquetReaderBenchmark.cpp) target_link_libraries( velox_dwio_parquet_reader_benchmark