diff --git a/velox/common/testutil/tests/CMakeLists.txt b/velox/common/testutil/tests/CMakeLists.txt index 02b7ee5f1045..d232c201d931 100644 --- a/velox/common/testutil/tests/CMakeLists.txt +++ b/velox/common/testutil/tests/CMakeLists.txt @@ -16,11 +16,5 @@ add_executable(velox_test_util_test TestValueTest.cpp SpillConfigTest.cpp) gtest_add_tests(velox_test_util_test "" AUTO) target_link_libraries( - velox_test_util_test - PRIVATE - velox_test_util - velox_exception - velox_spill_config - velox_exec - gtest - gtest_main) + velox_test_util_test PRIVATE velox_test_util velox_exception + velox_spill_config velox_exec gtest gtest_main) diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 5fbd19245169..772aac7d7506 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -13,9 +13,10 @@ # limitations under the License. add_library(velox_hive_config OBJECT HiveConfig.cpp) - target_link_libraries(velox_hive_config velox_exception) +add_subdirectory(iceberg) + add_library( velox_hive_connector OBJECT FileHandle.cpp @@ -30,18 +31,19 @@ add_library( target_link_libraries( velox_hive_connector - velox_common_io - velox_connector - velox_dwio_catalog_fbhive - velox_dwio_dwrf_reader - velox_dwio_dwrf_writer - velox_dwio_parquet_reader - velox_dwio_parquet_writer - velox_file - velox_hive_partition_function - velox_s3fs - velox_hdfs - velox_gcs) + PUBLIC velox_hive_iceberg_splitreader + PRIVATE velox_common_io + velox_connector + velox_dwio_catalog_fbhive + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_file + velox_hive_partition_function + velox_s3fs + velox_hdfs + velox_gcs) add_library(velox_hive_partition_function HivePartitionFunction.cpp) diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 15edd9d2ac2f..9482051fcb3b 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -25,15 +25,15 @@ #pragma once -#include -#include -#include - #include "velox/common/caching/CachedFactory.h" #include "velox/common/caching/FileIds.h" #include "velox/common/file/File.h" #include "velox/dwio/common/InputStream.h" +//#include +//#include +//#include + namespace facebook::velox { class Config; diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 522ec99058f4..be753d2f387d 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -75,27 +75,13 @@ std::unique_ptr HiveConnector::createDataSource( std::string, std::shared_ptr>& columnHandles, ConnectorQueryCtx* connectorQueryCtx) { - dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool()); - options.setMaxCoalesceBytes( - HiveConfig::maxCoalescedBytes(connectorQueryCtx->config())); - options.setMaxCoalesceDistance( - HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx->config())); - options.setFileColumnNamesReadAsLowerCase( - HiveConfig::isFileColumnNamesReadAsLowerCase( - connectorQueryCtx->config())); - options.setUseColumnNamesForColumnMapping( - HiveConfig::isOrcUseColumnNames(connectorQueryCtx->config())); - return std::make_unique( outputType, tableHandle, columnHandles, &fileHandleFactory_, - connectorQueryCtx->expressionEvaluator(), - connectorQueryCtx->cache(), - connectorQueryCtx->scanId(), executor_, - options); + connectorQueryCtx); } std::unique_ptr HiveConnector::createDataSink( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 279f892bb50d..19b7b49b7aaa 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -16,15 +16,14 @@ #include "velox/connectors/hive/HiveDataSource.h" -#include -#include - -#include "velox/dwio/common/CachedBufferedInput.h" -#include "velox/dwio/common/DirectBufferedInput.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/expression/ExprToSubfieldFilter.h" #include "velox/expression/FieldReference.h" +#include +#include + namespace facebook::velox::connector::hive { class HiveTableHandle; @@ -358,18 +357,13 @@ HiveDataSource::HiveDataSource( std::string, std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, - core::ExpressionEvaluator* expressionEvaluator, - cache::AsyncDataCache* cache, - const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options) - : fileHandleFactory_(fileHandleFactory), - readerOpts_(options), - pool_(&options.getMemoryPool()), + ConnectorQueryCtx* connectorQueryCtx) + : pool_(connectorQueryCtx->memoryPool()), outputType_(outputType), - expressionEvaluator_(expressionEvaluator), - cache_(cache), - scanId_(scanId), + expressionEvaluator_(connectorQueryCtx->expressionEvaluator()), + fileHandleFactory_(fileHandleFactory), + connectorQueryCtx_(connectorQueryCtx), executor_(executor) { // Column handled keyed on the column alias, the name used in the query. for (const auto& [canonicalizedName, columnHandle] : columnHandles) { @@ -410,7 +404,8 @@ HiveDataSource::HiveDataSource( VELOX_CHECK( hiveTableHandle_ != nullptr, "TableHandle must be an instance of HiveTableHandle"); - if (readerOpts_.isFileColumnNamesReadAsLowerCase()) { + if (HiveConfig::isFileColumnNamesReadAsLowerCase( + connectorQueryCtx->config())) { checkColumnNameLowerCase(outputType_); checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters()); checkColumnNameLowerCase(hiveTableHandle_->remainingFilter()); @@ -474,62 +469,20 @@ HiveDataSource::HiveDataSource( *scanSpec_, *remainingFilter, expressionEvaluator_); } - readerOpts_.setFileSchema(hiveTableHandle_->dataColumns()); ioStats_ = std::make_shared(); } -inline uint8_t parseDelimiter(const std::string& delim) { - for (char const& ch : delim) { - if (!std::isdigit(ch)) { - return delim[0]; - } - } - return stoi(delim); -} - -void HiveDataSource::parseSerdeParameters( - const std::unordered_map& serdeParameters) { - auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); - if (fieldIt == serdeParameters.end()) { - fieldIt = serdeParameters.find("serialization.format"); - } - auto collectionIt = - serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim); - if (collectionIt == serdeParameters.end()) { - // For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but - // Hive 3.x uses "collection.delim". - // See: https://issues.apache.org/jira/browse/HIVE-16922) - collectionIt = serdeParameters.find("colelction.delim"); - } - auto mapKeyIt = - serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); - - if (fieldIt == serdeParameters.end() && - collectionIt == serdeParameters.end() && - mapKeyIt == serdeParameters.end()) { - return; - } - - uint8_t fieldDelim = '\1'; - uint8_t collectionDelim = '\2'; - uint8_t mapKeyDelim = '\3'; - if (fieldIt != serdeParameters.end()) { - fieldDelim = parseDelimiter(fieldIt->second); - } - if (collectionIt != serdeParameters.end()) { - collectionDelim = parseDelimiter(collectionIt->second); - } - if (mapKeyIt != serdeParameters.end()) { - mapKeyDelim = parseDelimiter(mapKeyIt->second); - } - dwio::common::SerDeOptions serDeOptions( - fieldDelim, collectionDelim, mapKeyDelim); - readerOpts_.setSerDeOptions(serDeOptions); -} - std::unique_ptr HiveDataSource::createSplitReader() { return SplitReader::create( - split_, readerOutputType_, partitionKeys_, scanSpec_, pool_); + split_, + hiveTableHandle_, + scanSpec_, + readerOutputType_, + &partitionKeys_, + fileHandleFactory_, + connectorQueryCtx_, + executor_, + ioStats_); } void HiveDataSource::addSplit(std::shared_ptr split) { @@ -541,30 +494,11 @@ void HiveDataSource::addSplit(std::shared_ptr split) { VLOG(1) << "Adding split " << split_->toString(); - if (readerOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) { - VELOX_CHECK( - readerOpts_.getFileFormat() == split_->fileFormat, - "HiveDataSource received splits of different formats: {} and {}", - toString(readerOpts_.getFileFormat()), - toString(split_->fileFormat)); - } else { - parseSerdeParameters(split_->serdeParameters); - readerOpts_.setFileFormat(split_->fileFormat); - } - - auto fileHandle = fileHandleFactory_->generate(split_->filePath).second; - auto input = createBufferedInput(*fileHandle, readerOpts_); - if (splitReader_) { splitReader_.reset(); } splitReader_ = createSplitReader(); - splitReader_->prepareSplit( - hiveTableHandle_, - readerOpts_, - std::move(input), - metadataFilter_, - runtimeStats_); + splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } std::optional HiveDataSource::next( @@ -788,33 +722,6 @@ std::shared_ptr HiveDataSource::makeScanSpec( return spec; } -std::unique_ptr -HiveDataSource::createBufferedInput( - const FileHandle& fileHandle, - const dwio::common::ReaderOptions& readerOpts) { - if (cache_) { - return std::make_unique( - fileHandle.file, - dwio::common::MetricsLog::voidLog(), - fileHandle.uuid.id(), - cache_, - Connector::getTracker(scanId_, readerOpts.loadQuantum()), - fileHandle.groupId.id(), - ioStats_, - executor_, - readerOpts); - } - return std::make_unique( - fileHandle.file, - dwio::common::MetricsLog::voidLog(), - fileHandle.uuid.id(), - Connector::getTracker(scanId_, readerOpts.loadQuantum()), - fileHandle.groupId.id(), - ioStats_, - executor_, - readerOpts); -} - vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) { filterRows_.resize(output_->size()); diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 03d86a137108..8e448e18a35f 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -38,11 +38,8 @@ class HiveDataSource : public DataSource { std::string, std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, - core::ExpressionEvaluator* expressionEvaluator, - cache::AsyncDataCache* cache, - const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options); + ConnectorQueryCtx* connectorQueryCtx); void addSplit(std::shared_ptr split) override; @@ -95,15 +92,9 @@ class HiveDataSource : public DataSource { protected: virtual std::unique_ptr createSplitReader(); - std::unique_ptr createBufferedInput( - const FileHandle&, - const dwio::common::ReaderOptions&); - std::shared_ptr split_; - FileHandleFactory* fileHandleFactory_; - dwio::common::ReaderOptions readerOpts_; - std::shared_ptr scanSpec_; memory::MemoryPool* pool_; + std::shared_ptr scanSpec_; VectorPtr output_; std::unique_ptr splitReader_; @@ -128,9 +119,6 @@ class HiveDataSource : public DataSource { // hold adaptation. void resetSplit(); - void parseSerdeParameters( - const std::unordered_map& serdeParameters); - const RowVectorPtr& getEmptyOutput() { if (!emptyOutput_) { emptyOutput_ = RowVector::createEmpty(outputType_, pool_); @@ -140,7 +128,7 @@ class HiveDataSource : public DataSource { std::shared_ptr hiveTableHandle_; - // The row type for the data source output, not including filter only columns + // The row type for the data source output, not including filter-only columns const RowTypePtr outputType_; std::shared_ptr ioStats_; std::shared_ptr metadataFilter_; @@ -155,9 +143,9 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - cache::AsyncDataCache* const cache_{nullptr}; - const std::string& scanId_; - folly::Executor* executor_; + FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; + folly::Executor* const executor_; }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index a42b64e37141..cecf268dc07e 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -16,10 +16,21 @@ #include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" +#include "velox/dwio/common/CachedBufferedInput.h" +#include "velox/dwio/common/DirectBufferedInput.h" +#include "velox/dwio/common/Options.h" #include "velox/dwio/common/ReaderFactory.h" +#include +// #include + +#include +#include + namespace facebook::velox::connector::hive { namespace { @@ -57,7 +68,7 @@ bool testFilters( const std::string& filePath, const std::unordered_map>& partitionKey, - std::unordered_map>& + std::unordered_map>* partitionKeysHandle) { auto totalRows = reader->numberOfRows(); const auto& fileTypeWithId = reader->typeWithId(); @@ -70,7 +81,7 @@ bool testFilters( auto iter = partitionKey.find(name); if (iter != partitionKey.end() && iter->second.has_value()) { return applyPartitionFilter( - partitionKeysHandle[name]->dataType()->kind(), + (*partitionKeysHandle)[name]->dataType()->kind(), iter->second.value(), child->filter()); } @@ -116,41 +127,87 @@ velox::variant convertFromString(const std::optional& value) { return velox::variant(ToKind); } +inline uint8_t parseDelimiter(const std::string& delim) { + for (char const& ch : delim) { + if (!std::isdigit(ch)) { + return delim[0]; + } + } + return stoi(delim); +} + } // namespace std::unique_ptr SplitReader::create( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool) { + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats) { // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] - return std::make_unique( - hiveSplit, readerOutputType, partitionKeys, scanSpec, pool); + if (hiveSplit->customSplitInfo["table_format"] == "hive_iceberg") { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + + connectorQueryCtx, + executor, + ioStats); + } else { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats); + } } SplitReader::SplitReader( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool) - : hiveSplit_(std::move(hiveSplit)), + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats) + : hiveSplit_(hiveSplit), + hiveTableHandle_(hiveTableHandle), + scanSpec_(scanSpec), readerOutputType_(readerOutputType), partitionKeys_(partitionKeys), - scanSpec_(std::move(scanSpec)), - pool_(pool) {} + fileHandleFactory_(fileHandleFactory), + connectorQueryCtx_(connectorQueryCtx), + executor_(executor), + ioStats_(ioStats), + baseReaderOpts_(connectorQueryCtx->memoryPool()) {} void SplitReader::prepareSplit( - const std::shared_ptr& hiveTableHandle, - const dwio::common::ReaderOptions& readerOptions, - std::unique_ptr baseFileInput, std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - baseReader_ = dwio::common::getReaderFactory(readerOptions.getFileFormat()) - ->createReader(std::move(baseFileInput), readerOptions); + configureReaderOptions(); + + auto fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second; + auto baseFileInput = createBufferedInput(*fileHandle, baseReaderOpts_); + + baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat()) + ->createReader(std::move(baseFileInput), baseReaderOpts_); // Note that this doesn't apply to Hudi tables. emptySplit_ = false; @@ -174,23 +231,23 @@ void SplitReader::prepareSplit( } auto& fileType = baseReader_->rowType(); - auto columnTypes = adaptColumns(fileType, readerOptions.getFileSchema()); + auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema()); - auto skipRowsIt = hiveTableHandle->tableParameters().find( + auto skipRowsIt = hiveTableHandle_->tableParameters().find( dwio::common::TableParameter::kSkipHeaderLineCount); - if (skipRowsIt != hiveTableHandle->tableParameters().end()) { - rowReaderOpts_.setSkipRows(folly::to(skipRowsIt->second)); + if (skipRowsIt != hiveTableHandle_->tableParameters().end()) { + baseRowReaderOpts_.setSkipRows(folly::to(skipRowsIt->second)); } - rowReaderOpts_.setScanSpec(scanSpec_); - rowReaderOpts_.setMetadataFilter(metadataFilter); + baseRowReaderOpts_.setScanSpec(scanSpec_); + baseRowReaderOpts_.setMetadataFilter(metadataFilter); configureRowReaderOptions( - rowReaderOpts_, + baseRowReaderOpts_, ROW(std::vector(fileType->names()), std::move(columnTypes))); // NOTE: we firstly reset the finished 'baseRowReader_' of previous split // before setting up for the next one to avoid doubling the peak memory usage. baseRowReader_.reset(); - baseRowReader_ = baseReader_->createRowReader(rowReaderOpts_); + baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } std::vector SplitReader::adaptColumns( @@ -287,22 +344,24 @@ void SplitReader::setConstantValue( common::ScanSpec* spec, const TypePtr& type, const velox::variant& value) const { - spec->setConstantValue(BaseVector::createConstant(type, value, 1, pool_)); + spec->setConstantValue(BaseVector::createConstant( + type, value, 1, connectorQueryCtx_->memoryPool())); } void SplitReader::setNullConstantValue( common::ScanSpec* spec, const TypePtr& type) const { - spec->setConstantValue(BaseVector::createNullConstant(type, 1, pool_)); + spec->setConstantValue(BaseVector::createNullConstant( + type, 1, connectorQueryCtx_->memoryPool())); } void SplitReader::setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, const std::optional& value) const { - auto it = partitionKeys_.find(partitionKey); + auto it = partitionKeys_->find(partitionKey); VELOX_CHECK( - it != partitionKeys_.end(), + it != partitionKeys_->end(), "ColumnHandle is missing for partition key {}", partitionKey); auto constValue = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( @@ -310,6 +369,30 @@ void SplitReader::setPartitionValue( setConstantValue(spec, it->second->dataType(), constValue); } +void SplitReader::configureReaderOptions() { + baseReaderOpts_.setMaxCoalesceBytes( + HiveConfig::maxCoalescedBytes(connectorQueryCtx_->config())); + baseReaderOpts_.setMaxCoalesceDistance( + HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx_->config())); + baseReaderOpts_.setFileColumnNamesReadAsLowerCase( + HiveConfig::isFileColumnNamesReadAsLowerCase( + connectorQueryCtx_->config())); + baseReaderOpts_.setUseColumnNamesForColumnMapping( + HiveConfig::isOrcUseColumnNames(connectorQueryCtx_->config())); + baseReaderOpts_.setFileSchema(hiveTableHandle_->dataColumns()); + + if (baseReaderOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) { + VELOX_CHECK( + baseReaderOpts_.getFileFormat() == hiveSplit_->fileFormat, + "HiveDataSource received splits of different formats: {} and {}", + dwio::common::toString(baseReaderOpts_.getFileFormat()), + dwio::common::toString(hiveSplit_->fileFormat)); + } else { + parseSerdeParameters(hiveSplit_->serdeParameters); + baseReaderOpts_.setFileFormat(hiveSplit_->fileFormat); + } +} + void SplitReader::configureRowReaderOptions( dwio::common::RowReaderOptions& options, const RowTypePtr& rowType) { @@ -329,11 +412,79 @@ void SplitReader::configureRowReaderOptions( options.select(cs).range(hiveSplit_->start, hiveSplit_->length); } +void SplitReader::parseSerdeParameters( + const std::unordered_map& serdeParameters) { + auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); + if (fieldIt == serdeParameters.end()) { + fieldIt = serdeParameters.find("serialization.format"); + } + auto collectionIt = + serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim); + if (collectionIt == serdeParameters.end()) { + // For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but + // Hive 3.x uses "collection.delim". + // See: https://issues.apache.org/jira/browse/HIVE-16922) + collectionIt = serdeParameters.find("colelction.delim"); + } + auto mapKeyIt = + serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); + + if (fieldIt == serdeParameters.end() && + collectionIt == serdeParameters.end() && + mapKeyIt == serdeParameters.end()) { + return; + } + + uint8_t fieldDelim = '\1'; + uint8_t collectionDelim = '\2'; + uint8_t mapKeyDelim = '\3'; + if (fieldIt != serdeParameters.end()) { + fieldDelim = parseDelimiter(fieldIt->second); + } + if (collectionIt != serdeParameters.end()) { + collectionDelim = parseDelimiter(collectionIt->second); + } + if (mapKeyIt != serdeParameters.end()) { + mapKeyDelim = parseDelimiter(mapKeyIt->second); + } + dwio::common::SerDeOptions serDeOptions( + fieldDelim, collectionDelim, mapKeyDelim); + baseReaderOpts_.setSerDeOptions(serDeOptions); +} + +std::unique_ptr SplitReader::createBufferedInput( + const FileHandle& fileHandle, + const dwio::common::ReaderOptions& readerOpts) { + if (connectorQueryCtx_->cache()) { + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid.id(), + connectorQueryCtx_->cache(), + Connector::getTracker( + connectorQueryCtx_->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId.id(), + ioStats_, + executor_, + readerOpts); + } + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid.id(), + Connector::getTracker( + connectorQueryCtx_->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId.id(), + ioStats_, + executor_, + readerOpts); +} + std::string SplitReader::toString() const { std::string partitionKeys; std::for_each( - partitionKeys_.begin(), - partitionKeys_.end(), + partitionKeys_->begin(), + partitionKeys_->end(), [&](std::pair< const std::string, std::shared_ptr> diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 0cac6c4f9d6a..0f7fabd7b0f3 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -16,9 +16,18 @@ #pragma once +#include "velox/connectors/hive/FileHandle.h" #include "velox/dwio/common/Reader.h" #include "velox/type/Type.h" +namespace facebook::velox::cache { +class AsyncDataCache; +} + +namespace facebook::velox::connector { +class ConnectorQueryCtx; +} + namespace facebook::velox::dwio::common { class BufferedInput; } @@ -36,19 +45,27 @@ class SplitReader { public: static std::unique_ptr create( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool); + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats); SplitReader( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool); + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats); virtual ~SplitReader() = default; @@ -56,9 +73,6 @@ class SplitReader { /// do additional preparations before reading the split, e.g. Open delete /// files or log files, and add column adapatations for metadata columns virtual void prepareSplit( - const std::shared_ptr& hiveTableHandle, - const dwio::common::ReaderOptions& readerOptions, - std::unique_ptr baseFileInput, std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats); @@ -100,20 +114,34 @@ class SplitReader { const std::optional& value) const; std::shared_ptr hiveSplit_; + std::shared_ptr hiveTableHandle_; + std::shared_ptr scanSpec_; RowTypePtr readerOutputType_; - std::unordered_map>& + std::unordered_map>* partitionKeys_; - std::shared_ptr scanSpec_; - memory::MemoryPool* pool_; std::unique_ptr baseReader_; - dwio::common::RowReaderOptions rowReaderOpts_; std::unique_ptr baseRowReader_; + FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; + folly::Executor* const executor_; + std::shared_ptr ioStats_; private: + void configureReaderOptions(); + void configureRowReaderOptions( dwio::common::RowReaderOptions& options, const RowTypePtr& rowType); + void parseSerdeParameters( + const std::unordered_map& serdeParameters); + + std::unique_ptr createBufferedInput( + const FileHandle& fileHandle, + const dwio::common::ReaderOptions& readerOpts); + + dwio::common::ReaderOptions baseReaderOpts_; + dwio::common::RowReaderOptions baseRowReaderOpts_; bool emptySplit_; }; diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt new file mode 100644 index 000000000000..d6f9148680c5 --- /dev/null +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_hive_iceberg_splitreader + IcebergSplitReader.cpp IcebergSplit.cpp DeleteFileReader.cpp) + +target_link_libraries( + velox_hive_iceberg_splitreader + Folly::folly + gflags::gflags + glog::glog + gtest + gtest_main + xsimd) + +add_subdirectory(tests) diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.cpp b/velox/connectors/hive/iceberg/DeleteFileReader.cpp new file mode 100644 index 000000000000..daa428e2475c --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.cpp @@ -0,0 +1,198 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/type/Filter.h" + +namespace facebook::velox::connector::hive::iceberg { +DeleteFileReader::DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId) + : deleteFile_(deleteFile), + baseFilePath_(baseFilePath), + fileHandleFactory_(fileHandleFactory), + executor_(executor), + connectorQueryCtx_(connectorQueryCtx), + splitOffset_(splitOffset), + deletePositionsVector_(nullptr), + deletePositionsOffset_(-1), + endOfFile_(false) { + if (deleteFile.content == FileContent::kPositionalDeletes) { + createPositionalDeleteDataSource(deleteFile, connectorId); + } else if (deleteFile.content == FileContent::kEqualityDeletes) { + VELOX_NYI("Iceberg equality delete files are not supported yet."); + } else { + VELOX_FAIL("Unrecogonized Iceberg delete file type: ", deleteFile.content); + } +} + +void DeleteFileReader::createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId) { + auto filePathColumn = ICEBERG_DELETE_FILE_PATH_COLUMN(); + auto positionsColumn = ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); + + std::vector deleteColumnNames( + {filePathColumn->name, positionsColumn->name}); + std::vector> deleteColumnTypes( + {filePathColumn->type, positionsColumn->type}); + + RowTypePtr deleteRowType = + ROW(std::move(deleteColumnNames), std::move(deleteColumnTypes)); + + std::unordered_map> + deleteColumnHandles; + deleteColumnHandles[filePathColumn->name] = + std::make_shared( + filePathColumn->name, + HiveColumnHandle::ColumnType::kRegular, + VARCHAR(), + VARCHAR()); + deleteColumnHandles[positionsColumn->name] = + std::make_shared( + positionsColumn->name, + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT()); + + size_t lastPathDelimiterPos = deleteFile.filePath.find_last_of('/'); + std::string deleteFileName = deleteFile.filePath.substr( + lastPathDelimiterPos, deleteFile.filePath.size() - lastPathDelimiterPos); + + // TODO: Build filters on the path column: filePathColumn = baseFilePath_ + // TODO: Build filters on the positionsColumn: + // positionsColumn >= baseReadOffset_ + splitOffsetInFile + SubfieldFilters subfieldFilters; + std::vector values = {baseFilePath_}; + std::unique_ptr pathFilter = + std::make_unique(values, false); + subfieldFilters[common::Subfield(filePathColumn->name)] = + std::move(pathFilter); + + auto deleteTableHandle = std::make_shared( + connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr); + + auto deleteSplit = std::make_shared( + connectorId, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + deleteDataSource_ = std::make_unique( + deleteRowType, + deleteTableHandle, + deleteColumnHandles, + fileHandleFactory_, + executor_, + connectorQueryCtx_); + + deleteDataSource_->addSplit(deleteSplit); +} + +void DeleteFileReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap) { + ContinueFuture blockingFuture(ContinueFuture::makeEmpty()); + // We are going to read to the row number up to the end of the batch. For the + // same base file, the deleted rows are in ascending order in the same delete + // file + int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size; + + // Finish unused delete positions from last batch + if (deletePositionsVector_ && + deletePositionsOffset_ < deletePositionsVector_->size()) { + readDeletePositionsVec(baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + + uint64_t numRowsToRead = std::min(size, deleteFile_.recordCount); + while (true) { + std::optional deletesResult = + deleteDataSource_->next(numRowsToRead, blockingFuture); + + if (!deletesResult.has_value()) { + return; + } + + auto data = deletesResult.value(); + if (data) { + if (data->size() > 0) { + VELOX_CHECK(data->childrenSize() > 0); + data->loadedVector(); + + deletePositionsVector_ = data->childAt(1); + deletePositionsOffset_ = 0; + + readDeletePositionsVec( + baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + continue; + } else { + endOfFile_ = true; + return; + } + } +} + +bool DeleteFileReader::endOfFile() { + return endOfFile_; +} + +void DeleteFileReader::readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap) { + // Convert the positions in file into positions relative to the start of the + // split. + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + int64_t offset = baseReadOffset + splitOffset_; + while (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] < rowNumberUpperBound) { + bits::setBit( + deleteBitmap, deletePositions[deletePositionsOffset_] - offset); + deletePositionsOffset_++; + } +} + +bool DeleteFileReader::readFinishedForBatch(int64_t rowNumberUpperBound) { + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + if (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) { + return true; + } + return false; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.h b/velox/connectors/hive/iceberg/DeleteFileReader.h new file mode 100644 index 000000000000..140b6f3d8889 --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveDataSource.h" + +#include +#include + +namespace facebook::velox::cache { +class AsyncDataCache; +} + +namespace facebook::velox::connector { +class ConnectorQueryCtx; +} + +namespace facebook::velox::core { +class ExpressionEvaluator; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class DeleteFileReader { + public: + DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId); + + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap); + + bool endOfFile(); + + private: + void createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId); + + void readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap); + + bool readFinishedForBatch(int64_t rowNumberUpperBound); + + const IcebergDeleteFile& deleteFile_; + const std::string& baseFilePath_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + ConnectorQueryCtx* const connectorQueryCtx_; + uint64_t splitOffset_; + + std::unique_ptr deleteDataSource_; + VectorPtr deletePositionsVector_; + uint64_t deletePositionsOffset_; + bool endOfFile_; +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h new file mode 100644 index 000000000000..1cff2b8efbe2 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/dwio/common/Options.h" + +#include +#include +#include + +namespace facebook::velox::connector::hive::iceberg { + +enum class FileContent { + kData, + kPositionalDeletes, + kEqualityDeletes, +}; + +struct IcebergDeleteFile { + FileContent content; + const std::string filePath; + dwio::common::FileFormat fileFormat; + uint64_t recordCount; + uint64_t fileSizeInBytes; + // The field ids for the delete columns for equality delete files + std::vector equalityFieldIds; + // The lower bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], where 10 and 50 are the deleted row positions in + // the data file, then lowerBounds would contain entry <1, "10"> + std::unordered_map lowerBounds; + // The upper bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> + std::unordered_map upperBounds; + + IcebergDeleteFile( + FileContent _content, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _recordCount, + uint64_t _fileSizeInBytes, + std::vector _equalityFieldIds = {}, + std::unordered_map _lowerBounds = {}, + std::unordered_map _upperBounds = {}) + : content(_content), + filePath(_filePath), + fileFormat(_fileFormat), + recordCount(_recordCount), + fileSizeInBytes(_fileSizeInBytes), + equalityFieldIds(_equalityFieldIds), + lowerBounds(_lowerBounds), + upperBounds(_upperBounds) {} +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h new file mode 100644 index 000000000000..d4442321e53a --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/type/Type.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +const std::string kIcebergDeleteFilePathColumn = "file_path"; +const std::string kIcebergRowPositionColumn = "pos"; + +struct IcebergMetadataColumn { + int id; + std::string name; + std::shared_ptr type; + std::string doc; + + IcebergMetadataColumn( + int _id, + const std::string& _name, + std::shared_ptr _type, + const std::string& _doc) + : id(_id), name(_name), type(_type), doc(_doc) {} +}; + +#define ICEBERG_DELETE_FILE_PATH_COLUMN() \ + std::make_shared( \ + 2147483546, \ + kIcebergDeleteFilePathColumn, \ + VARCHAR(), \ + "Path of a file in which a deleted row is stored"); + +#define ICEBERG_DELETE_FILE_POSITIONS_COLUMN() \ + std::make_shared( \ + 2147483545, \ + kIcebergRowPositionColumn, \ + BIGINT(), \ + "Ordinal position of a deleted row in the data file"); + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp new file mode 100644 index 000000000000..fe256b073b17 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplit.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber) { + // TODO: Deserialize _extraFileInfo to get deleteFiles; +} + +// For tests only +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo, + std::vector _deletes) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo), + deleteFiles(_deletes) {} +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h new file mode 100644 index 000000000000..25f1e6a4e4b1 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { + std::vector deleteFiles; + + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}); + + // For tests only + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}, + std::vector deletes = {}); +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp new file mode 100644 index 000000000000..5c4cbea344ec --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/common/BufferUtil.h" + +using namespace facebook::velox::dwio::common; + +namespace facebook::velox::connector::hive::iceberg { + +IcebergSplitReader::IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats) + : SplitReader( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats) {} + +void IcebergSplitReader::prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) { + SplitReader::prepareSplit(metadataFilter, runtimeStats); + baseReadOffset_ = 0; + deleteFileReaders_.clear(); + splitOffset_ = baseRowReader_->nextRowNumber(); + + // TODO: Deserialize the std::vector deleteFiles. For now + // we assume it's already deserialized. + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(hiveSplit_); + + const auto& deleteFiles = icebergSplit->deleteFiles; + for (const auto& deleteFile : deleteFiles) { + deleteFileReaders_.push_back(std::make_unique( + deleteFile, + hiveSplit_->filePath, + fileHandleFactory_, + executor_, + connectorQueryCtx_, + splitOffset_, + hiveSplit_->connectorId)); + } +} + +uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) { + Mutation mutation; + mutation.deletedRows = nullptr; + + if (!deleteFileReaders_.empty()) { + auto numBytes = bits::nbytes(size); + dwio::common::ensureCapacity( + deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool()); + std::memset((void*)deleteBitmap_->as(), 0L, numBytes); + + for (auto iter = deleteFileReaders_.begin(); + iter != deleteFileReaders_.end(); + iter++) { + (*iter)->readDeletePositions( + baseReadOffset_, size, deleteBitmap_->asMutable()); + if ((*iter)->endOfFile()) { + iter = deleteFileReaders_.erase(iter); + } + } + + deleteBitmap_->setSize(numBytes); + mutation.deletedRows = deleteBitmap_->as(); + } + + auto rowsScanned = baseRowReader_->next(size, output, &mutation); + baseReadOffset_ += rowsScanned; + + return rowsScanned; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h new file mode 100644 index 000000000000..520ff5e9aa4b --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +namespace facebook::velox::connector { +class ColumnHandle; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class IcebergSplitReader : public SplitReader { + public: + IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats); + + ~IcebergSplitReader() override = default; + + void prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) override; + + uint64_t next(int64_t size, VectorPtr& output) override; + + private: + // The read offset to the beginning of the split in number of rows for the + // current batch for the base data file + uint64_t baseReadOffset_; + // The file position for the first row in the split + uint64_t splitOffset_; + std::list> deleteFileReaders_; + BufferPtr deleteBitmap_; +}; +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/cmake/Config.cmake.in b/velox/connectors/hive/iceberg/cmake/Config.cmake.in new file mode 100644 index 000000000000..0bf51ecc28f5 --- /dev/null +++ b/velox/connectors/hive/iceberg/cmake/Config.cmake.in @@ -0,0 +1,4 @@ +@PACKAGE_INIT@ + +include("${CMAKE_CURRENT_LIST_DIR}/iceberg.cmake") +check_required_components("@PROJECT_NAME@") diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt new file mode 100644 index 000000000000..63603c724ec2 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +if(NOT VELOX_DISABLE_GOOGLETEST) + + add_executable(velox_hive_iceberg_test IcebergReadTest.cpp) + add_test(velox_hive_iceberg_test velox_hive_iceberg_test) + + target_link_libraries( + velox_hive_iceberg_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_hive_partition_function + velox_dwio_common_exception + velox_dwio_common_test_utils + velox_dwio_dwrf_proto + velox_vector_test_lib + velox_exec + velox_exec_test_lib + Folly::folly + gtest + gtest_main) + +endif() diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp new file mode 100644 index 000000000000..3b6c24d38d9a --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -0,0 +1,280 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +#include + +using namespace facebook::velox::exec::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::test; + +namespace facebook::velox::connector::hive::iceberg { + +class HiveIcebergTest : public HiveConnectorTestBase { + public: + void assertPositionalDeletes( + const std::vector& deleteRows, + bool multipleBaseFiles = false) { + assertPositionalDeletes( + deleteRows, + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + ")", + multipleBaseFiles); + } + void assertPositionalDeletes( + const std::vector& deleteRows, + std::string duckdbSql, + bool multipleBaseFiles = false) { + std::shared_ptr dataFilePath = writeDataFile(rowCount); + + std::mt19937 gen{0}; + int64_t numDeleteRowsBefore = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + int64_t numDeleteRowsAfter = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + std::shared_ptr deleteFilePath = writePositionDeleteFile( + dataFilePath->path, + deleteRows, + numDeleteRowsBefore, + numDeleteRowsAfter); + + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->path, + fileFomat_, + deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter, + testing::internal::GetFileSize( + std::fopen(deleteFilePath->path.c_str(), "r"))); + + auto icebergSplit = makeIcebergSplit(dataFilePath->path, {deleteFile}); + + auto plan = tableScanNode(); + auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + + std::vector makeRandomDeleteRows(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + + std::vector makeSequenceRows(int32_t maxRowNumber) { + std::vector deleteRows; + deleteRows.resize(maxRowNumber); + std::iota(deleteRows.begin(), deleteRows.end(), 0); + return deleteRows; + } + + const static int rowCount = 20000; + + private: + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + std::unordered_map> partitionKeys; + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive_iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } + + std::vector makeVectors(int32_t count, int32_t rowsPerVector) { + std::vector vectors; + + for (int i = 0; i < count; i++) { + auto data = makeSequenceRows(rowsPerVector); + VectorPtr c0 = vectorMaker_.flatVector(data); + vectors.push_back(makeRowVector({"c0"}, {c0})); + } + + return vectors; + } + + std::shared_ptr writeDataFile(uint64_t numRows) { + auto dataVectors = makeVectors(1, numRows); + + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->path, dataVectors); + createDuckDbTable(dataVectors); + return dataFilePath; + } + + std::shared_ptr writePositionDeleteFile( + const std::string& dataFilePath, + const std::vector& deleteRows, + int64_t numRowsBefore = 0, + int64_t numRowsAfter = 0) { + // if containsMultipleDataFiles == true, we will write rows for other base + // files before and after the target base file + uint32_t numDeleteRows = numRowsBefore + deleteRows.size() + numRowsAfter; + + auto child = vectorMaker_.flatVector(std::vector{1UL}); + + auto filePathVector = + vectorMaker_.flatVector(numDeleteRows, [&](auto row) { + if (row < numRowsBefore) { + std::string dataFilePathBefore = dataFilePath + "_before"; + return StringView(dataFilePathBefore); + } else if ( + row >= numRowsBefore && row < deleteRows.size() + numRowsBefore) { + return StringView(dataFilePath); + } else if ( + row >= deleteRows.size() + numRowsBefore && row < numDeleteRows) { + std::string dataFilePathAfter = dataFilePath + "_after"; + return StringView(dataFilePathAfter); + } else { + return StringView(); + } + }); + + std::vector deleteRowsVec; + deleteRowsVec.reserve(numDeleteRows); + + if (numRowsBefore > 0) { + auto rowsBefore = makeSequenceRows(numRowsBefore); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsBefore.begin(), rowsBefore.end()); + } + deleteRowsVec.insert( + deleteRowsVec.end(), deleteRows.begin(), deleteRows.end()); + if (numRowsAfter > 0) { + auto rowsAfter = makeSequenceRows(numRowsAfter); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsAfter.begin(), rowsAfter.end()); + } + + auto deletePositionsVector = + vectorMaker_.flatVector(deleteRowsVec); + RowVectorPtr deleteFileVectors = makeRowVector( + {pathColumn_->name, posColumn_->name}, + {filePathVector, deletePositionsVector}); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->path, deleteFileVectors); + + return deleteFilePath; + } + + std::string makeNotInList(const std::vector& deleteRows) { + if (deleteRows.empty()) { + return ""; + } + + return std::accumulate( + deleteRows.begin() + 1, + deleteRows.end(), + std::to_string(deleteRows[0]), + [](const std::string& a, int64_t b) { + return a + ", " + std::to_string(b); + }); + } + + std::shared_ptr assertQuery( + const core::PlanNodePtr& plan, + std::shared_ptr dataFilePath, + const std::vector& deleteFiles, + const std::string& duckDbSql) { + auto icebergSplit = makeIcebergSplit(dataFilePath->path, deleteFiles); + return OperatorTestBase::assertQuery(plan, {icebergSplit}, duckDbSql); + } + + core::PlanNodePtr tableScanNode() { + return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + } + + private: + dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; + RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; + std::shared_ptr pathColumn_ = + ICEBERG_DELETE_FILE_PATH_COLUMN(); + std::shared_ptr posColumn_ = + ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); +}; + +TEST_F(HiveIcebergTest, positionalDeletesSingleBaseFile) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertPositionalDeletes({0, 1, 2, 3}); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount)); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp", false); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", false); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}); +} + +TEST_F(HiveIcebergTest, positionalDeletesMultipleBaseFiles) { + folly::SingletonVault::singleton()->registrationComplete(); + + // // Delete row 0, 1, 2, 3 from the first batch out of two. + // assertPositionalDeletes({0, 1, 2, 3}, true); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}, true); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}, true); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount), true); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp", true); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", true); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}, true); +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp index 57dfcded14c0..a4bf93471e46 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp @@ -14,18 +14,17 @@ * limitations under the License. */ -#include -#include "gtest/gtest.h" -#include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/tests/MinioServer.h" -#include "velox/dwio/parquet/reader/ParquetReader.h" #include "velox/exec/TableWriter.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include +#include + using namespace facebook::velox; using namespace facebook::velox::core; using namespace facebook::velox::exec; diff --git a/velox/dwio/dwrf/test/CMakeLists.txt b/velox/dwio/dwrf/test/CMakeLists.txt index 063fdc8b26da..026b5bc48df2 100644 --- a/velox/dwio/dwrf/test/CMakeLists.txt +++ b/velox/dwio/dwrf/test/CMakeLists.txt @@ -347,6 +347,7 @@ add_test(velox_dwrf_e2e_filter_test velox_dwrf_e2e_filter_test) target_link_libraries( velox_dwrf_e2e_filter_test velox_e2e_filter_test_base + velox_hive_connector velox_link_libs velox_test_util Folly::folly diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 11d86cd96cfb..ddabb8fa92fd 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -243,10 +243,10 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique<::arrow::util::RleDecoder>( reinterpret_cast(pageData_), repeatLength, - arrow::bit_util::NumRequiredBits(maxRepeat_)); + ::arrow::bit_util::NumRequiredBits(maxRepeat_)); pageData_ += repeatLength; } @@ -257,12 +257,12 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { defineDecoder_ = std::make_unique( pageData_, pageData_ + defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); + ::arrow::bit_util::NumRequiredBits(maxDefine_)); } - wideDefineDecoder_ = std::make_unique( + wideDefineDecoder_ = std::make_unique<::arrow::util::RleDecoder>( reinterpret_cast(pageData_), defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); + ::arrow::bit_util::NumRequiredBits(maxDefine_)); pageData_ += defineLength; } encodedDataSize_ = pageEnd - pageData_; @@ -301,17 +301,17 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { pageData_ = readBytes(bytes, pageBuffer_); if (repeatLength) { - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique<::arrow::util::RleDecoder>( reinterpret_cast(pageData_), repeatLength, - arrow::bit_util::NumRequiredBits(maxRepeat_)); + ::arrow::bit_util::NumRequiredBits(maxRepeat_)); } if (maxDefine_ > 0) { defineDecoder_ = std::make_unique( pageData_ + repeatLength, pageData_ + repeatLength + defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); + ::arrow::bit_util::NumRequiredBits(maxDefine_)); } auto levelsSize = repeatLength + defineLength; pageData_ += levelsSize; diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index c079b6a64c38..0fb4d3dc35d1 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -356,8 +356,8 @@ class PageReader { // Decoder for single bit definition levels. the arrow decoders are used for // multibit levels pending fixing RleBpDecoder for the case. std::unique_ptr defineDecoder_; - std::unique_ptr repeatDecoder_; - std::unique_ptr wideDefineDecoder_; + std::unique_ptr<::arrow::util::RleDecoder> repeatDecoder_; + std::unique_ptr<::arrow::util::RleDecoder> wideDefineDecoder_; // True for a leaf column for which repdefs are loaded for the whole column // chunk. This is typically the leaftmost leaf of a list. Other leaves under diff --git a/velox/dwio/parquet/reader/ParquetTypeWithId.h b/velox/dwio/parquet/reader/ParquetTypeWithId.h index 08b1449a8f19..eca23f4221a8 100644 --- a/velox/dwio/parquet/reader/ParquetTypeWithId.h +++ b/velox/dwio/parquet/reader/ParquetTypeWithId.h @@ -16,10 +16,11 @@ #pragma once -#include #include "velox/dwio/common/TypeWithId.h" #include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" +#include + namespace facebook::velox::parquet { /// Describes what to extract from leaf repetition / definition diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index ff8aa8a9c27e..c94ab590f638 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -43,6 +43,7 @@ target_link_libraries( ZLIB::ZLIB ${TEST_LINK_LIBS}) +message(STATUS "velox_hive_connector_libs ${velox_hive_connector_libs}") add_executable(velox_dwio_parquet_reader_benchmark ParquetReaderBenchmark.cpp) target_link_libraries( velox_dwio_parquet_reader_benchmark diff --git a/velox/exec/benchmarks/CMakeLists.txt b/velox/exec/benchmarks/CMakeLists.txt index 45a80e2873af..92fd47a6bb60 100644 --- a/velox/exec/benchmarks/CMakeLists.txt +++ b/velox/exec/benchmarks/CMakeLists.txt @@ -37,7 +37,15 @@ add_executable(velox_hash_benchmark HashTableBenchmark.cpp) target_link_libraries(velox_hash_benchmark velox_exec velox_exec_test_lib velox_vector_test_lib ${FOLLY_BENCHMARK}) -add_executable(velox_sort_benchmark RowContainerSortBenchmark.cpp) - -target_link_libraries(velox_sort_benchmark velox_exec velox_exec_test_lib - velox_vector_test_lib ${FOLLY_BENCHMARK}) +if(${VELOX_ENABLE_PARQUET}) + add_executable(velox_sort_benchmark RowContainerSortBenchmark.cpp) + + target_link_libraries( + velox_sort_benchmark + velox_exec + velox_exec_test_lib + velox_vector_test_lib + ${FOLLY_BENCHMARK} + arrow + thrift) +endif()