diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index d7193699336c..c3437e856538 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -642,6 +642,7 @@ bool applyPartitionFilter( VELOX_FAIL( "Bad type {} for partition value: {}", type->kind(), partitionValue); } + return true; } } // namespace @@ -655,7 +656,8 @@ bool testFilters( const std::unordered_map>& partitionKeysHandle) { const auto totalRows = reader->numberOfRows(); - const auto& fileTypeWithId = reader->typeWithId(); + // const auto& fileTypeWithId = reader->typeWithId(); + const auto& fileType = reader->rowType(); const auto& rowType = reader->rowType(); for (const auto& child : scanSpec->children()) { if (child->filter()) { @@ -685,14 +687,15 @@ bool testFilters( return false; } } else { - const auto& typeWithId = fileTypeWithId->childByName(name); - const auto columnStats = reader->columnStatistics(typeWithId->id()); + const auto& type = rowType->findChild(name); + const auto columnStats = + reader->columnStatistics(rowType->getChildIdx(name)); + VLOG(0) << "testFilters column name: " << name + << " type: " << type->kind() << " stats: " << columnStats.get() + << (columnStats ? columnStats->toString() : "no stats"); if (columnStats != nullptr && !testFilter( - child->filter(), - columnStats.get(), - totalRows.value(), - typeWithId->type())) { + child->filter(), columnStats.get(), totalRows.value(), type)) { VLOG(1) << "Skipping " << filePath << " based on stats and filter for column " << child->fieldName(); diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 40b72f126dfe..d17e2d40515c 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -210,7 +210,10 @@ std::unique_ptr HiveDataSource::createSplitReader() { ioStats_, fileHandleFactory_, executor_, - scanSpec_); + scanSpec_, + remainingFilterExprSet_, + expressionEvaluator_, + totalRemainingFilterTime_); } std::unique_ptr HiveDataSource::setupBucketConversion() { @@ -283,8 +286,10 @@ void HiveDataSource::addSplit(std::shared_ptr split) { } splitReader_ = createSplitReader(); + // Split reader subclasses may need to use the reader options in prepareSplit // so we initialize it beforehand. + splitReader_->configureReaderOptions(randomSkip_); splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_); } @@ -551,6 +556,7 @@ std::shared_ptr HiveDataSource::toWaveDataSource() { void HiveDataSource::registerWaveDelegateHook(WaveDelegateHookFunction hook) { waveDelegateHook_ = hook; } + std::shared_ptr toWaveDataSource(); } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index b7ca9bcb6344..8414d6883078 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -161,7 +161,7 @@ class HiveDataSource : public DataSource { subfields_; SubfieldFilters filters_; std::shared_ptr metadataFilter_; - std::unique_ptr remainingFilterExprSet_; + std::shared_ptr remainingFilterExprSet_; RowVectorPtr emptyOutput_; dwio::common::RuntimeStatistics runtimeStats_; std::atomic totalRemainingFilterTime_{0}; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index e9bf9387a067..8878485c624b 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -74,7 +74,10 @@ std::unique_ptr SplitReader::create( const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec) { + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime) { // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] if (hiveSplit->customSplitInfo.count("table_format") > 0 && hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") { @@ -88,7 +91,10 @@ std::unique_ptr SplitReader::create( ioStats, fileHandleFactory, executor, - scanSpec); + scanSpec, + remainingFilterExprSet, + expressionEvaluator, + totalRemainingFilterTime); } else { return std::unique_ptr(new SplitReader( hiveSplit, @@ -179,6 +185,11 @@ void SplitReader::resetSplit() { hiveSplit_.reset(); } +std::shared_ptr SplitReader::baseFileSchema() { + VELOX_CHECK_NOT_NULL(baseReader_.get()); + return baseReader_->typeWithId(); +} + int64_t SplitReader::estimatedRowSize() const { if (!baseRowReader_) { return DataSource::kUnknownRowSize; diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index bf3b0e7c330f..f1e4797a57cb 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -65,7 +65,10 @@ class SplitReader { const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec); + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime); virtual ~SplitReader() = default; @@ -89,6 +92,8 @@ class SplitReader { void resetSplit(); + std::shared_ptr baseFileSchema(); + int64_t estimatedRowSize() const; void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const; diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index bc78005c91bb..c2721f6c0705 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp - IcebergSplit.cpp PositionalDeleteFileReader.cpp) +velox_add_library( + velox_hive_iceberg_splitreader + IcebergSplitReader.cpp + IcebergSplit.cpp + PositionalDeleteFileReader.cpp + EqualityDeleteFileReader.cpp + FilterUtil.cpp) velox_link_libraries(velox_hive_iceberg_splitreader velox_connector Folly::folly) diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp new file mode 100644 index 000000000000..b6f146765972 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" + +#include "velox/connectors/hive/HiveConnectorUtil.h" +#include "velox/connectors/hive/iceberg/FilterUtil.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/dwio/common/ReaderFactory.h" +#include "velox/dwio/common/TypeUtils.h" + +using namespace facebook::velox::common; +using namespace facebook::velox::core; +using namespace facebook::velox::exec; + +namespace facebook::velox::connector::hive::iceberg { + +static constexpr const int kMaxBatchRows = 10'000; + +EqualityDeleteFileReader::EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + std::shared_ptr baseFileSchema, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId) + : deleteFile_(deleteFile), + baseFileSchema_(baseFileSchema), + connectorQueryCtx_(connectorQueryCtx), + hiveConfig_(hiveConfig), + deleteSplit_(nullptr), + fileHandleFactory_(fileHandleFactory), + executor_(executor), + pool_(connectorQueryCtx->memoryPool()), + ioStats_(ioStats), + deleteRowReader_(nullptr) { + VELOX_CHECK(deleteFile_.content == FileContent::kEqualityDeletes); + + if (deleteFile_.recordCount == 0) { + return; + } + + std::unordered_set equalityFieldIds( + deleteFile_.equalityFieldIds.begin(), deleteFile_.equalityFieldIds.end()); + auto deleteFieldSelector = [&equalityFieldIds](size_t index) { + return equalityFieldIds.find(static_cast(index)) != + equalityFieldIds.end(); + }; + auto deleteFileSchema = dwio::common::typeutils::buildSelectedType( + baseFileSchema_, deleteFieldSelector); + + rowType_ = std::static_pointer_cast(deleteFileSchema->type()); + + // TODO: push down filter if previous delete file contains this one. E.g. + // previous equality delete file has a=1, and this file also contains + // columns a, then a!=1 can be pushed as a filter when reading this delete + // file. + + auto scanSpec = std::make_shared(""); + scanSpec->addAllChildFields(rowType_->asRow()); + + deleteSplit_ = std::make_shared( + connectorId, + deleteFile_.filePath, + deleteFile_.fileFormat, + 0, + deleteFile_.fileSizeInBytes); + + // Create the Reader and RowReader + + dwio::common::ReaderOptions deleteReaderOpts(pool_); + configureReaderOptions( + deleteReaderOpts, + hiveConfig_, + connectorQueryCtx_, + rowType_, + deleteSplit_); + + auto deleteFileHandleCachePtr = + fileHandleFactory_->generate(deleteFile_.filePath); + auto deleteFileInput = createBufferedInput( + *deleteFileHandleCachePtr, + deleteReaderOpts, + connectorQueryCtx_, + ioStats_, + executor_); + + auto deleteReader = + dwio::common::getReaderFactory(deleteReaderOpts.fileFormat()) + ->createReader(std::move(deleteFileInput), deleteReaderOpts); + + dwio::common::RowReaderOptions deleteRowReaderOpts; + configureRowReaderOptions( + {}, + scanSpec, + nullptr, + rowType_, + deleteSplit_, + hiveConfig_, + connectorQueryCtx_->sessionProperties(), + deleteRowReaderOpts); + + deleteRowReader_.reset(); + deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts); +} + +void EqualityDeleteFileReader::readDeleteValues( + SubfieldFilters& subfieldFilters, + std::vector& expressionInputs) { + VELOX_CHECK(deleteRowReader_); + VELOX_CHECK(deleteSplit_); + + if (!deleteValuesOutput_) { + deleteValuesOutput_ = BaseVector::create(rowType_, 0, pool_); + } + + // TODO:: verfiy if the field is a sub-field. Velox currently doesn't support + // pushing down filters to sub-fields + if (rowType_->size() == 1) { + // Construct the IN list filter that can be pushed down to the base file + // readers, then update the baseFileScanSpec. + readSingleColumnDeleteValues(subfieldFilters); + } else { + readMultipleColumnDeleteValues(expressionInputs); + } + + deleteSplit_.reset(); +} + +void EqualityDeleteFileReader::readSingleColumnDeleteValues( + SubfieldFilters& subfieldFilters) { + std::unique_ptr filter = std::make_unique(); + while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) { + if (deleteValuesOutput_->size() == 0) { + continue; + } + + deleteValuesOutput_->loadedVector(); + auto vector = + std::dynamic_pointer_cast(deleteValuesOutput_)->childAt(0); + auto name = rowType_->nameOf(0); + + auto typeKind = vector->type()->kind(); + VELOX_CHECK( + typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL, + "Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}", + name, + typeKind); + + auto notExistsFilter = + createNotExistsFilter(vector, 0, deleteValuesOutput_->size(), typeKind); + filter = filter->mergeWith(notExistsFilter.get()); + } + + if (filter->kind() != FilterKind::kAlwaysTrue) { + if (subfieldFilters.find(common::Subfield(rowType_->nameOf(0))) != + subfieldFilters.end()) { + subfieldFilters[common::Subfield(rowType_->nameOf(0))] = + subfieldFilters[common::Subfield(rowType_->nameOf(0))]->mergeWith( + filter.get()); + } else { + subfieldFilters[common::Subfield(rowType_->nameOf(0))] = + std::move(filter); + } + } +} + +void EqualityDeleteFileReader::readMultipleColumnDeleteValues( + std::vector& expressionInputs) { + auto numDeleteFields = rowType_->size(); + VELOX_CHECK_GT( + numDeleteFields, + 0, + "Iceberg equality delete file should have at least one field."); + + // TODO: logical expression simplifications + while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) { + if (deleteValuesOutput_->size() == 0) { + continue; + } + + deleteValuesOutput_->loadedVector(); + auto rowVector = std::dynamic_pointer_cast(deleteValuesOutput_); + auto numDeletedValues = rowVector->childAt(0)->size(); + + for (int i = 0; i < numDeletedValues; i++) { + std::vector disconjunctInputs; + + for (int j = 0; j < numDeleteFields; j++) { + auto type = rowType_->childAt(j); + auto name = rowType_->nameOf(j); + auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j)); + + std::vector isNotEqualInputs; + isNotEqualInputs.push_back( + std::make_shared(type, name)); + isNotEqualInputs.push_back(std::make_shared(value)); + auto isNotEqualExpr = + std::make_shared(BOOLEAN(), isNotEqualInputs, "neq"); + + disconjunctInputs.push_back(isNotEqualExpr); + } + + auto disconjunctNotEqualExpr = + std::make_shared(BOOLEAN(), disconjunctInputs, "or"); + expressionInputs.push_back(disconjunctNotEqualExpr); + } + } +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h new file mode 100644 index 000000000000..29160b56a38f --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/Reader.h" +#include "velox/expression/Expr.h" + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +using SubfieldFilters = + std::unordered_map>; + +class EqualityDeleteFileReader { + public: + EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + std::shared_ptr baseFileSchema, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId); + + void readDeleteValues( + SubfieldFilters& subfieldFilters, + std::vector& typedExpressions); + + private: + void readSingleColumnDeleteValues(SubfieldFilters& subfieldFilters); + + void readMultipleColumnDeleteValues( + std::vector& expressionInputs); + + const IcebergDeleteFile& deleteFile_; + const std::shared_ptr baseFileSchema_; + const ConnectorQueryCtx* const connectorQueryCtx_; + const std::shared_ptr hiveConfig_; + std::shared_ptr deleteSplit_; + + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + memory::MemoryPool* const pool_; + const std::shared_ptr ioStats_; + + RowTypePtr rowType_; + std::unique_ptr deleteRowReader_; + VectorPtr deleteValuesOutput_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/FilterUtil.cpp b/velox/connectors/hive/iceberg/FilterUtil.cpp new file mode 100644 index 000000000000..e1d988f7e179 --- /dev/null +++ b/velox/connectors/hive/iceberg/FilterUtil.cpp @@ -0,0 +1,122 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/connectors/hive/iceberg/FilterUtil.h" + +namespace facebook::velox::connector::hive::iceberg { + +using namespace facebook::velox::exec; +using namespace facebook::velox::core; + +// Read 'size' values from 'valuesVector' starting at 'offset', de-duplicate +// remove nulls and sort. Return a list of unique non-null values sorted in +// ascending order and a boolean indicating whether there were any null values. +template +std::pair, bool> toValues( + const VectorPtr& valuesVector, + vector_size_t offset, + vector_size_t size) { + auto simpleValues = valuesVector->as>(); + + bool nullAllowed = false; + std::vector values; + values.reserve(size); + + for (auto i = offset; i < offset + size; i++) { + if (simpleValues->isNullAt(i)) { + nullAllowed = true; + } else { + if constexpr (std::is_same_v) { + values.emplace_back(simpleValues->valueAt(i).toMillis()); + } else { + values.emplace_back(simpleValues->valueAt(i)); + } + } + } + + // In-place sort, remove duplicates, and later std::move to save memory + std::sort(values.begin(), values.end()); + auto last = std::unique(values.begin(), values.end()); + values.resize(std::distance(values.begin(), last)); + + return {std::move(values), nullAllowed}; +} + +template +std::unique_ptr createNegatedBigintValuesFilter( + const VectorPtr& valuesVector, + vector_size_t offset, + vector_size_t size) { + auto valuesPair = toValues(valuesVector, offset, size); + + const auto& values = valuesPair.first; + bool hasNull = valuesPair.second; + + return common::createNegatedBigintValues(values, !hasNull); +} + +std::unique_ptr createNotExistsFilter( + const VectorPtr& elements, + vector_size_t offset, + vector_size_t size, + const TypeKind& type) { + std::unique_ptr filter; + switch (type) { + case TypeKind::HUGEINT: + // TODO: createNegatedHugeintValuesFilter is not implemented yet. + VELOX_NYI("createNegatedHugeintValuesFilter is not implemented yet"); + case TypeKind::BIGINT: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::INTEGER: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::SMALLINT: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::TINYINT: + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::BOOLEAN: + // Hack: using BIGINT filter for bool, which is essentially "int1_t". + filter = createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::TIMESTAMP: + filter = + createNegatedBigintValuesFilter(elements, offset, size); + break; + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + // TODO: createNegatedBytesValuesFilter is not implemented yet. + VELOX_NYI("createNegatedBytesValuesFilter is not implemented yet"); + case TypeKind::REAL: + case TypeKind::DOUBLE: + VELOX_USER_FAIL( + "Iceberg equality delete column cannot be DOUBLE or FLOAT"); + case TypeKind::UNKNOWN: + [[fallthrough]]; + case TypeKind::ARRAY: + [[fallthrough]]; + case TypeKind::MAP: + [[fallthrough]]; + case TypeKind::ROW: + [[fallthrough]]; + default: + VELOX_UNSUPPORTED( + "Unsupported in-list type {} for NOT EXIST predicate", type); + } + return filter; +} +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/FilterUtil.h b/velox/connectors/hive/iceberg/FilterUtil.h new file mode 100644 index 000000000000..90e446f297a1 --- /dev/null +++ b/velox/connectors/hive/iceberg/FilterUtil.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/expression/Expr.h" +#include "velox/type/Filter.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::connector::hive::iceberg { +std::unique_ptr createNotExistsFilter( + const VectorPtr& elements, + vector_size_t offset, + vector_size_t size, + const TypeKind& type); + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index b7c1f6b52340..9c9677f9c489 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -16,11 +16,13 @@ #include "velox/connectors/hive/iceberg/IcebergSplitReader.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/dwio/common/BufferUtil.h" using namespace facebook::velox::dwio::common; +using namespace facebook::velox::exec; namespace facebook::velox::connector::hive::iceberg { @@ -35,7 +37,10 @@ IcebergSplitReader::IcebergSplitReader( const std::shared_ptr& ioStats, FileHandleFactory* const fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec) + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime) : SplitReader( hiveSplit, hiveTableHandle, @@ -47,31 +52,109 @@ IcebergSplitReader::IcebergSplitReader( fileHandleFactory, executor, scanSpec), + originalScanSpec_(nullptr), baseReadOffset_(0), splitOffset_(0), deleteBitmap_(nullptr), - deleteBitmapBitOffset_(0) {} + deleteBitmapBitOffset_(0), + deleteExprSet_(nullptr), + expressionEvaluator_(expressionEvaluator), + totalRemainingFilterTime_(totalRemainingFilterTime) {} + +IcebergSplitReader::~IcebergSplitReader() { + if (originalScanSpec_) { + *scanSpec_ = *originalScanSpec_; + } +} void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats, const std::shared_ptr& rowIndexColumn) { + // The base file reader needs to be created before checking if the split is + // emtpy. createReader(std::move(metadataFilter), rowIndexColumn); + // checkIfSplitIsEmpty needs to call fileTypeWithId = reader->typeWithId(); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); return; } - createRowReader(); + // std::shared_ptr icebergSplit = + // std::dynamic_pointer_cast(hiveSplit_); + // const auto& deleteFiles = icebergSplit->deleteFiles; + // std::unordered_set equalityFieldIds; + // for (const auto& deleteFile : deleteFiles) { + // if (deleteFile.content == FileContent::kEqualityDeletes && + // deleteFile.recordCount > 0) { + // equalityFieldIds.insert( + // deleteFile.equalityFieldIds.begin(), + // deleteFile.equalityFieldIds.end()); + // } + // } + // + // for (int32_t id : equalityFieldIds) { + // scanSpec_->getOrCreateChild(baseReader_->rowType()->nameOf(id - 1)); + // } std::shared_ptr icebergSplit = std::dynamic_pointer_cast(hiveSplit_); + const auto& deleteFiles = icebergSplit->deleteFiles; + + // Process the equality delete files to update the scan spec and remaining + // filters. It needs to be done after creating the Reader and before creating + // the RowReader. + + SubfieldFilters subfieldFilters; + std::vector conjunctInputs; + + for (const auto& deleteFile : deleteFiles) { + if (deleteFile.content == FileContent::kEqualityDeletes && + deleteFile.recordCount > 0) { + // TODO: build cache of to avoid repeating file + // parsing across partitions. Within a single partition, the splits should + // be with the same equality delete files and only need to be parsed once. + auto equalityDeleteReader = std::make_unique( + deleteFile, + baseFileSchema(), + fileHandleFactory_, + executor_, + connectorQueryCtx_, + hiveConfig_, + ioStats_, + runtimeStats, + hiveSplit_->connectorId); + equalityDeleteReader->readDeleteValues(subfieldFilters, conjunctInputs); + } + } + + if (!subfieldFilters.empty()) { + originalScanSpec_ = scanSpec_->clone(); + + for (auto iter = subfieldFilters.begin(); iter != subfieldFilters.end(); + iter++) { + auto childSpec = scanSpec_->getOrCreateChild(iter->first); + childSpec->addFilter(*iter->second); + childSpec->setSubscript(scanSpec_->children().size() - 1); + } + } + + if (!conjunctInputs.empty()) { + core::TypedExprPtr expression = + std::make_shared(BOOLEAN(), conjunctInputs, "and"); + deleteExprSet_ = expressionEvaluator_->compile(expression); + VELOX_CHECK_EQ(deleteExprSet_->size(), 1); + } + + createRowReader(); + baseReadOffset_ = 0; splitOffset_ = baseRowReader_->nextRowNumber(); - positionalDeleteFileReaders_.clear(); - const auto& deleteFiles = icebergSplit->deleteFiles; + // Create the positional deletes file readers. They need to be created after + // the RowReader is created. + positionalDeleteFileReaders_.clear(); for (const auto& deleteFile : deleteFiles) { if (deleteFile.content == FileContent::kPositionalDeletes) { if (deleteFile.recordCount > 0) { @@ -88,8 +171,6 @@ void IcebergSplitReader::prepareSplit( splitOffset_, hiveSplit_->connectorId)); } - } else { - VELOX_NYI(); } } } @@ -146,6 +227,29 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { : nullptr; auto rowsScanned = baseRowReader_->next(size, output, &mutation); + + // Evaluate the remaining filter deleteExprSet_ for every batch and update the + // output vector if it reduces any rows. + if (deleteExprSet_) { + auto filterStartMicros = getCurrentTimeMicro(); + + filterRows_.resize(output->size()); + auto rowVector = std::dynamic_pointer_cast(output); + expressionEvaluator_->evaluate( + deleteExprSet_.get(), filterRows_, *rowVector, filterResult_); + auto numRemainingRows = exec::processFilterResults( + filterResult_, filterRows_, filterEvalCtx_, pool_); + + if (numRemainingRows < output->size()) { + output = exec::wrap( + numRemainingRows, filterEvalCtx_.selectedIndices, rowVector); + } + + totalRemainingFilterTime_.fetch_add( + (getCurrentTimeMicro() - filterStartMicros) * 1000, + std::memory_order_relaxed); + } + baseReadOffset_ += rowsScanned; deleteBitmapBitOffset_ = rowsScanned; diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index b5ab7da64480..2de26d630290 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -19,6 +19,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/SplitReader.h" #include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" +#include "velox/exec/OperatorUtils.h" namespace facebook::velox::connector::hive::iceberg { @@ -37,9 +38,12 @@ class IcebergSplitReader : public SplitReader { const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec); + const std::shared_ptr& scanSpec, + std::shared_ptr& remainingFilterExprSet, + core::ExpressionEvaluator* expressionEvaluator, + std::atomic& totalRemainingFilterTime); - ~IcebergSplitReader() override = default; + ~IcebergSplitReader() override; void prepareSplit( std::shared_ptr metadataFilter, @@ -49,6 +53,11 @@ class IcebergSplitReader : public SplitReader { uint64_t next(uint64_t size, VectorPtr& output) override; private: + // The ScanSpec may need to be updated for different partitions if the split + // comes with single column equality delete files. So we need to keep a copy + // of the original ScanSpec. + std::shared_ptr originalScanSpec_; + // The read offset to the beginning of the split in number of rows for the // current batch for the base data file uint64_t baseReadOffset_; @@ -60,5 +69,14 @@ class IcebergSplitReader : public SplitReader { // The offset in bits of the deleteBitmap_ starting from where the bits shall // be consumed uint64_t deleteBitmapBitOffset_; + + std::unique_ptr deleteExprSet_; + core::ExpressionEvaluator* expressionEvaluator_; + std::atomic& totalRemainingFilterTime_; + + // Reusable memory for remaining filter evaluation. + VectorPtr filterResult_; + SelectivityVector filterRows_; + exec::FilterEvalCtx filterEvalCtx_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index d79e21b73343..2cdcb5061a0a 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -100,53 +100,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0); } - void assertMultipleSplits( - const std::vector& deletePositions, - int32_t splitCount, - int32_t numPrefetchSplits) { - std::map> rowGroupSizesForFiles; - for (int32_t i = 0; i < splitCount; i++) { - std::string dataFileName = fmt::format("data_file_{}", i); - rowGroupSizesForFiles[dataFileName] = {rowCount}; - } - - std::unordered_map< - std::string, - std::multimap>> - deleteFilesForBaseDatafiles; - for (int i = 0; i < splitCount; i++) { - std::string deleteFileName = fmt::format("delete_file_{}", i); - deleteFilesForBaseDatafiles[deleteFileName] = { - {fmt::format("data_file_{}", i), deletePositions}}; - } - - assertPositionalDeletes( - rowGroupSizesForFiles, deleteFilesForBaseDatafiles, numPrefetchSplits); - } - - std::vector makeRandomIncreasingValues(int64_t begin, int64_t end) { - VELOX_CHECK(begin < end); - - std::mt19937 gen{0}; - std::vector values; - values.reserve(end - begin); - for (int i = begin; i < end; i++) { - if (folly::Random::rand32(0, 10, gen) > 8) { - values.push_back(i); - } - } - return values; - } - - std::vector makeContinuousIncreasingValues( - int64_t begin, - int64_t end) { - std::vector values; - values.resize(end - begin); - std::iota(values.begin(), values.end(), begin); - return values; - } - /// @rowGroupSizesForFiles The key is the file name, and the value is a vector /// of RowGroup sizes /// @deleteFilesForBaseDatafiles The key is the delete file name, and the @@ -214,7 +167,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::string duckdbSql = getDuckDBQuery(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); - auto plan = tableScanNode(); + auto plan = tableScanNode(rowType_); auto task = HiveConnectorTestBase::assertQuery( plan, splits, duckdbSql, numPrefetchSplits); @@ -225,9 +178,157 @@ class HiveIcebergTest : public HiveConnectorTestBase { ASSERT_TRUE(it->second.peakMemoryBytes > 0); } + void assertEqualityDeletes( + const std::vector>& equalityDeleteVector, + const std::vector& equalityFieldIds, + std::string duckDbSql = "") { + // We will create data vectors with numColumns number of columns that is the + // max field Id in equalityFieldIds + int32_t numDataColumns = + *std::max_element(equalityFieldIds.begin(), equalityFieldIds.end()); + + VELOX_CHECK_GT(numDataColumns, 0); + VELOX_CHECK_GE(numDataColumns, equalityDeleteVector.size()); + VELOX_CHECK_GT(equalityDeleteVector.size(), 0); + + auto numDeletedValues = equalityDeleteVector[0].size(); + + VELOX_CHECK_LE(equalityFieldIds.size(), numDataColumns); + + std::shared_ptr dataFilePath = + writeDataFiles(rowCount, numDataColumns)[0]; + + std::shared_ptr deleteFilePath = + writeEqualityDeleteFile(equalityDeleteVector); + IcebergDeleteFile deleteFile( + FileContent::kEqualityDeletes, + deleteFilePath->getPath(), + fileFomat_, + equalityDeleteVector[0].size(), + testing::internal::GetFileSize( + std::fopen(deleteFilePath->getPath().c_str(), "r")), + equalityFieldIds); + + auto icebergSplit = makeIcebergSplit(dataFilePath->getPath(), {deleteFile}); + + std::string predicates = + makePredicates(equalityDeleteVector, equalityFieldIds); + + // Select all columns + duckDbSql = "SELECT * FROM tmp"; + if (numDeletedValues > 0) { + duckDbSql += fmt::format(" WHERE {}", predicates); + } + + assertEqualityDeletes(icebergSplit, rowType_, duckDbSql); + + // Select a column that's not in the filter columns + if (numDataColumns > 1 && equalityDeleteVector.size() < numDataColumns) { + // if (inputDuckDbSql.empty()) { + std::string duckDbSql = "SELECT c0 FROM tmp"; + if (numDeletedValues > 0) { + duckDbSql += fmt::format(" WHERE {}", predicates); + } + + std::vector names({"c0"}); + std::vector types(1, BIGINT()); + assertEqualityDeletes( + icebergSplit, + std::make_shared(std::move(names), std::move(types)), + duckDbSql); + } + } + + void assertMultipleSplits( + const std::vector& deletePositions, + int32_t splitCount, + int32_t numPrefetchSplits) { + std::map> rowGroupSizesForFiles; + for (int32_t i = 0; i < splitCount; i++) { + std::string dataFileName = fmt::format("data_file_{}", i); + rowGroupSizesForFiles[dataFileName] = {rowCount}; + } + + std::unordered_map< + std::string, + std::multimap>> + deleteFilesForBaseDatafiles; + for (int i = 0; i < splitCount; i++) { + std::string deleteFileName = fmt::format("delete_file_{}", i); + deleteFilesForBaseDatafiles[deleteFileName] = { + {fmt::format("data_file_{}", i), deletePositions}}; + } + + assertPositionalDeletes( + rowGroupSizesForFiles, deleteFilesForBaseDatafiles, numPrefetchSplits); + } + + std::vector makeRandomIncreasingValues(int64_t begin, int64_t end) { + VELOX_CHECK(begin < end); + + std::mt19937 gen{0}; + std::vector values; + values.reserve(end - begin); + for (int i = begin; i < end; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + values.push_back(i); + } + } + return values; + } + + std::vector makeContinuousIncreasingValues( + int64_t begin, + int64_t end) { + std::vector values; + values.resize(end - begin); + std::iota(values.begin(), values.end(), begin); + return values; + } + + std::vector makeSequenceValues(int32_t numRows, int8_t repeat = 1) { + VELOX_CHECK_GT(repeat, 0); + + auto maxValue = std::ceil((double)numRows / repeat); + std::vector values; + values.reserve(numRows); + for (int32_t i = 0; i < maxValue; i++) { + for (int8_t j = 0; j < repeat; j++) { + values.push_back(i); + } + } + values.resize(numRows); + return values; + } + + std::vector makeRandomDeleteValues(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + const static int rowCount = 20000; private: + void assertEqualityDeletes( + std::shared_ptr split, + RowTypePtr outputRowType, + const std::string& duckDbSql) { + auto plan = tableScanNode(outputRowType); + auto task = OperatorTestBase::assertQuery(plan, {split}, duckDbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + std::map> writeDataFiles( std::map> rowGroupSizesForFiles) { std::map> dataFilePaths; @@ -258,6 +359,24 @@ class HiveIcebergTest : public HiveConnectorTestBase { return dataFilePaths; } + std::vector> writeDataFiles( + uint64_t numRows, + int32_t numColumns = 1, + int32_t splitCount = 1) { + auto dataVectors = makeVectors(splitCount, numRows, numColumns); + VELOX_CHECK_EQ(dataVectors.size(), splitCount); + + std::vector> dataFilePaths; + dataFilePaths.reserve(splitCount); + for (auto i = 0; i < splitCount; i++) { + dataFilePaths.emplace_back(TempFilePath::create()); + writeToFile(dataFilePaths.back()->getPath(), dataVectors[i]); + } + + createDuckDbTable(dataVectors); + return dataFilePaths; + } + /// Input is like <"deleteFile1", <"dataFile1", {pos_RG1, pos_RG2,..}>, /// <"dataFile2", {pos_RG1, pos_RG2,..}> std::unordered_map< @@ -335,6 +454,44 @@ class HiveIcebergTest : public HiveConnectorTestBase { return vectors; } + std::vector + makeVectors(int32_t count, int32_t rowsPerVector, int32_t numColumns = 1) { + std::vector types(numColumns, BIGINT()); + std::vector names; + for (int j = 0; j < numColumns; j++) { + names.push_back(fmt::format("c{}", j)); + } + + std::vector rowVectors; + for (int i = 0; i < count; i++) { + std::vector vectors; + + // Create the column values like below: + // c0 c1 c2 + // 0 0 0 + // 1 0 0 + // 2 1 0 + // 3 1 1 + // 4 2 1 + // 5 2 1 + // 6 3 2 + // ... + // In the first column c0, the values are continuously increasing and not + // repeating. In the second column c1, the values are continuously + // increasing and each value repeats once. And so on. + for (int j = 0; j < numColumns; j++) { + auto data = makeSequenceValues(rowsPerVector, j + 1); + vectors.push_back(vectorMaker_.flatVector(data)); + } + + rowVectors.push_back(makeRowVector(names, vectors)); + } + + rowType_ = std::make_shared(std::move(names), std::move(types)); + + return rowVectors; + } + std::shared_ptr makeIcebergSplit( const std::string& dataFilePath, const std::vector& deleteFiles = {}) { @@ -472,8 +629,83 @@ class HiveIcebergTest : public HiveConnectorTestBase { }); } - core::PlanNodePtr tableScanNode() { - return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + core::PlanNodePtr tableScanNode(RowTypePtr outputRowType) { + return PlanBuilder(pool_.get()).tableScan(outputRowType).planNode(); + } + + std::shared_ptr writeEqualityDeleteFile( + const std::vector>& equalityDeleteVector) { + std::vector names; + std::vector vectors; + for (int i = 0; i < equalityDeleteVector.size(); i++) { + names.push_back(fmt::format("c{}", i)); + vectors.push_back( + vectorMaker_.flatVector(equalityDeleteVector[i])); + } + + RowVectorPtr deleteFileVectors = makeRowVector(names, vectors); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->getPath(), deleteFileVectors); + + return deleteFilePath; + } + + std::string makePredicates( + const std::vector>& equalityDeleteVector, + const std::vector& equalityFieldIds) { + std::string predicates(""); + int32_t numDataColumns = + *std::max_element(equalityFieldIds.begin(), equalityFieldIds.end()); + + VELOX_CHECK_GT(numDataColumns, 0); + VELOX_CHECK_GE(numDataColumns, equalityDeleteVector.size()); + VELOX_CHECK_GT(equalityDeleteVector.size(), 0); + + auto numDeletedValues = equalityDeleteVector[0].size(); + + if (numDeletedValues == 0) { + return predicates; + } + + // If all values for a column are deleted, just return an always-false + // predicate + for (auto i = 0; i < equalityDeleteVector.size(); i++) { + auto equalityFieldId = equalityFieldIds[i]; + auto deleteValues = equalityDeleteVector[i]; + + auto lastIter = std::unique(deleteValues.begin(), deleteValues.end()); + auto numDistinctValues = lastIter - deleteValues.begin(); + auto minValue = 1; + auto maxValue = *std::max_element(deleteValues.begin(), lastIter); + if (maxValue - minValue + 1 == numDistinctValues && + maxValue == (rowCount - 1) / equalityFieldId) { + return "1 = 0"; + } + } + + if (equalityDeleteVector.size() == 1) { + std::string name = fmt::format("c{}", equalityFieldIds[0] - 1); + predicates = fmt::format( + "{} NOT IN ({})", name, makeNotInList({equalityDeleteVector[0]})); + } else { + for (int i = 0; i < numDeletedValues; i++) { + std::string oneRow(""); + for (int j = 0; j < equalityFieldIds.size(); j++) { + std::string name = fmt::format("c{}", equalityFieldIds[j] - 1); + std::string predicate = + fmt::format("({} <> {})", name, equalityDeleteVector[j][i]); + + oneRow = oneRow == "" ? predicate + : fmt::format("({} OR {})", oneRow, predicate); + } + + predicates = predicates == "" + ? oneRow + : fmt::format("{} AND {}", predicates, oneRow); + } + } + return predicates; } dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; @@ -659,4 +891,81 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) { assertMultipleSplits({}, 10, 3); } +// Delete values from a single column file +TEST_F(HiveIcebergTest, equalityDeletesSingleFileColumn1) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertEqualityDeletes({{0, 1, 2, 3}}, {1}); + // Delete the first and last row in each batch (10000 rows per batch) + assertEqualityDeletes({{0, 9999, 10000, 19999}}, {1}); + // Delete several rows in the second batch (10000 rows per batch) + assertEqualityDeletes({{10000, 10002, 19999}}, {1}); + // Delete random rows + assertEqualityDeletes({makeRandomDeleteValues(rowCount)}, {1}); + // Delete 0 rows + assertEqualityDeletes({{}}, {1}); + // Delete all rows + assertEqualityDeletes({makeSequenceValues(rowCount)}, {1}); + // Delete rows that don't exist + assertEqualityDeletes({{20000, 29999}}, {1}); +} + +// Delete values from the second column in a 2-column file +// +// c1 c2 +// 0 0 +// 1 0 +// 2 1 +// 3 1 +// 4 2 +// ... ... +// 19999 9999 +TEST_F(HiveIcebergTest, equalityDeletesSingleFileColumn2) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete values 0, 1, 2, 3 from the second column + assertEqualityDeletes({{0, 1, 2, 3}}, {2}); + // Delete the smallest value 0 and the largest value 9999 from the second + // column, which has the range [0, 9999] + assertEqualityDeletes({{0, 9999}}, {2}); + // Delete non-existent values from the second column + assertEqualityDeletes({{10000, 10002, 19999}}, {2}); + // Delete random rows from the second column + assertEqualityDeletes({makeSequenceValues(rowCount)}, {2}); + // Delete 0 values + assertEqualityDeletes({{}}, {2}); + // Delete all values + assertEqualityDeletes({makeSequenceValues(rowCount / 2)}, {2}); +} + +// Delete values from 2 columns with the following data: +// +// c1 c2 +// 0 0 +// 1 0 +// 2 1 +// 3 1 +// 4 2 +// ... ... +// 19999 9999 +TEST_F(HiveIcebergTest, equalityDeletesSingleFileMultipleColumns) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete rows 0, 1 + assertEqualityDeletes({{0, 1}, {0, 0}}, {1, 2}); + // Delete rows 0, 2, 4, 6 + assertEqualityDeletes({{0, 2, 4, 6}, {0, 1, 2, 3}}, {1, 2}); + // Delete the last row + assertEqualityDeletes({{19999}, {9999}}, {1, 2}); + // Delete non-existent values + assertEqualityDeletes({{20000, 30000}, {10000, 1500}}, {1, 2}); + // Delete 0 values + assertEqualityDeletes({{}, {}}, {1, 2}, "SELECT * FROM tmp"); + // Delete all values + assertEqualityDeletes( + {makeSequenceValues(rowCount), makeSequenceValues(rowCount, 2)}, + {1, 2}, + "SELECT * FROM tmp WHERE 1 = 0"); +} } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp index e0b2a6c31f85..fe26c8419db2 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp @@ -15,7 +15,11 @@ */ #include "velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.h" + #include + +#include + #include "velox/exec/tests/utils/PrefixSortUtils.h" using namespace facebook::velox; @@ -328,9 +332,14 @@ void IcebergSplitReaderBenchmark::readSingleColumn( suspender.dismiss(); + auto ioExecutor = std::make_unique(3); + std::shared_ptr remainingFilterExprSet{nullptr}; + std::atomic totalRemainingFilterTime; + uint64_t resultSize = 0; for (std::shared_ptr split : splits) { scanSpec->resetCachedValues(true); + std::unique_ptr icebergSplitReader = std::make_unique( split, @@ -341,8 +350,11 @@ void IcebergSplitReaderBenchmark::readSingleColumn( rowType, ioStats, &fileHandleFactory, - nullptr, - scanSpec); + ioExecutor.get(), + scanSpec, + remainingFilterExprSet, + connectorQueryCtx_->expressionEvaluator(), + totalRemainingFilterTime); std::shared_ptr randomSkip; icebergSplitReader->configureReaderOptions(randomSkip); diff --git a/velox/dwio/common/ScanSpec.cpp b/velox/dwio/common/ScanSpec.cpp index a37cbc98ec6a..123c16b97ba0 100644 --- a/velox/dwio/common/ScanSpec.cpp +++ b/velox/dwio/common/ScanSpec.cpp @@ -30,6 +30,64 @@ ScanSpec* ScanSpec::getOrCreateChild(const std::string& name) { return child; } +ScanSpec& ScanSpec::operator=(const ScanSpec& other) { + if (this != &other) { + numReads_ = other.numReads_; + subscript_ = other.subscript_; + fieldName_ = other.fieldName_; + channel_ = other.channel_; + constantValue_ = other.constantValue_; + projectOut_ = other.projectOut_; + extractValues_ = other.extractValues_; + makeFlat_ = other.makeFlat_; + filter_ = other.filter_; + metadataFilters_ = other.metadataFilters_; + selectivity_ = other.selectivity_; + enableFilterReorder_ = other.enableFilterReorder_; + children_ = other.children_; + stableChildren_ = other.stableChildren_; + childByFieldName_ = other.childByFieldName_; + valueHook_ = other.valueHook_; + isArrayElementOrMapEntry_ = other.isArrayElementOrMapEntry_; + maxArrayElementsCount_ = other.maxArrayElementsCount_; + } + return *this; +} + +std::shared_ptr ScanSpec::clone() { + if (children_.empty()) { + return std::make_shared(*this); + } + + std::vector> childrenCopy; + folly::F14FastMap childByFieldNameCopy; + std::vector stableChildrenCopy; + stableChildrenCopy.resize(stableChildren_.size()); + + for (auto& child : children_) { + auto childCopy = child->clone(); + if (child->filter_) { + childCopy->filter_ = child->filter_->clone(); + } + + childrenCopy.push_back(childCopy); + childByFieldNameCopy[childCopy->fieldName()] = childCopy.get(); + + auto iter = + std::find(stableChildren_.begin(), stableChildren_.end(), child.get()); + if (iter != stableChildren_.end()) { + stableChildrenCopy[iter - stableChildren_.begin()] = childCopy.get(); + } + } + + auto copy = std::make_shared(*this); + copy->children_ = childrenCopy; + copy->childByFieldName_ = childByFieldNameCopy; + copy->stableChildren_ = stableChildrenCopy; + + return copy; +} + ScanSpec* ScanSpec::getOrCreateChild(const Subfield& subfield) { auto* container = this; const auto& path = subfield.path(); diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index f549ea433248..441f338ecf81 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -46,6 +46,14 @@ class ScanSpec { explicit ScanSpec(const std::string& name) : fieldName_(name) {} + ScanSpec(const ScanSpec& other) { + *this = other; + } + + ScanSpec& operator=(const ScanSpec&); + + std::shared_ptr clone(); + // Filter to apply. If 'this' corresponds to a struct/list/map, this // can only be isNull or isNotNull, other filtering is given by // 'children'. @@ -366,7 +374,7 @@ class ScanSpec { // True if a string dictionary or flat map in this field should be // returned as flat. bool makeFlat_ = false; - std::unique_ptr filter_; + std::shared_ptr filter_; // Filters that will be only used for row group filtering based on metadata. // The conjunctions among these filters are tracked in MetadataFilter, with diff --git a/velox/dwio/common/TypeWithId.cpp b/velox/dwio/common/TypeWithId.cpp index 03328024ef67..b50451098bcf 100644 --- a/velox/dwio/common/TypeWithId.cpp +++ b/velox/dwio/common/TypeWithId.cpp @@ -78,6 +78,7 @@ std::unique_ptr TypeWithId::create( uint32_t next = 1; std::vector> children(type->size()); for (int i = 0, size = type->size(); i < size; ++i) { + // There is no guarantee that the spec contains auto* childSpec = spec.childByName(type->nameOf(i)); if (childSpec && !childSpec->isConstant()) { children[i] = create(type->childAt(i), next, i); diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index ee06286dfa4f..29d5d6b12e01 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -1769,6 +1769,27 @@ ExprSet::ExprSet( } } +ExprSet::ExprSet( + const std::vector>& sources, + core::ExecCtx* execCtx) + : execCtx_(execCtx) { + clear(); + exprs_ = sources; + std::vector allDistinctFields; + for (auto& expr : exprs_) { + Expr::mergeFields( + distinctFields_, multiplyReferencedFields_, expr->distinctFields()); + } +} + +void ExprSet::operator=(const ExprSet& another) { + exprs_ = another.exprs_; + distinctFields_ = another.distinctFields_; + multiplyReferencedFields_ = another.multiplyReferencedFields_; + toReset_ = another.toReset_; + memoizingExprs_ = another.memoizingExprs_; +} + namespace { void addStats( const exec::Expr& expr, diff --git a/velox/expression/Expr.h b/velox/expression/Expr.h index ade47d61f8b0..2b51f5c2bb0d 100644 --- a/velox/expression/Expr.h +++ b/velox/expression/Expr.h @@ -45,7 +45,9 @@ DECLARE_string(velox_save_input_on_expression_system_failure_path); namespace facebook::velox::exec { class ExprSet; + class FieldReference; + class VectorFunction; struct ExprStats { @@ -709,6 +711,12 @@ class ExprSet { core::ExecCtx* execCtx, bool enableConstantFolding = true); + ExprSet( + const std::vector>& sources, + core::ExecCtx* execCtx); + + void operator=(const ExprSet& another); + virtual ~ExprSet(); // Initialize and evaluate all expressions available in this ExprSet. diff --git a/velox/expression/tests/SimpleFunctionTest.cpp b/velox/expression/tests/SimpleFunctionTest.cpp index ae5d1b75a745..8cbcea3638b1 100644 --- a/velox/expression/tests/SimpleFunctionTest.cpp +++ b/velox/expression/tests/SimpleFunctionTest.cpp @@ -999,7 +999,7 @@ VectorPtr testVariadicArgReuse( // Create a dummy EvalCtx. SelectivityVector rows(inputs[0]->size()); - exec::ExprSet exprSet({}, execCtx); + exec::ExprSet exprSet(std::vector{}, execCtx); RowVectorPtr inputRows = vectorMaker.rowVector({}); exec::EvalCtx evalCtx(execCtx, &exprSet, inputRows.get()); diff --git a/velox/functions/prestosql/tests/ElementAtTest.cpp b/velox/functions/prestosql/tests/ElementAtTest.cpp index 1a7a6b8d003d..04d2bb30fbe5 100644 --- a/velox/functions/prestosql/tests/ElementAtTest.cpp +++ b/velox/functions/prestosql/tests/ElementAtTest.cpp @@ -133,7 +133,7 @@ class ElementAtTest : public FunctionBaseTest { test::assertEqualVectors(expected, result); } // case 4: Verify NaNs are identified when employing caching. - exec::ExprSet exprSet({}, &execCtx_); + exec::ExprSet exprSet(std::vector{}, &execCtx_); auto inputs = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); @@ -1059,7 +1059,7 @@ TEST_F(ElementAtTest, testCachingOptimization) { } // Make a dummy eval context. - exec::ExprSet exprSet({}, &execCtx_); + exec::ExprSet exprSet(std::vector{}, &execCtx_); auto inputs = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); @@ -1196,7 +1196,7 @@ TEST_F(ElementAtTest, testCachingOptimizationComplexKey) { } // Make a dummy eval context. - exec::ExprSet exprSet({}, &execCtx_); + exec::ExprSet exprSet(std::vector{}, &execCtx_); auto inputs = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); diff --git a/velox/functions/prestosql/tests/StringFunctionsTest.cpp b/velox/functions/prestosql/tests/StringFunctionsTest.cpp index d884a25c2f56..1e97c4c5c79b 100644 --- a/velox/functions/prestosql/tests/StringFunctionsTest.cpp +++ b/velox/functions/prestosql/tests/StringFunctionsTest.cpp @@ -1395,7 +1395,7 @@ void StringFunctionsTest::testReplaceInPlace( auto replaceFunction = exec::getVectorFunction("replace", {VARCHAR(), VARCHAR()}, {}, config); SelectivityVector rows(tests.size()); - ExprSet exprSet({}, &execCtx_); + ExprSet exprSet(std::vector{}, &execCtx_); RowVectorPtr inputRows = makeRowVector({}); exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputRows.get()); replaceFunction->apply(rows, functionInputs, VARCHAR(), evalCtx, resultPtr); diff --git a/velox/type/Filter.cpp b/velox/type/Filter.cpp index d0f6932b4aad..f474694fcf34 100644 --- a/velox/type/Filter.cpp +++ b/velox/type/Filter.cpp @@ -1180,7 +1180,7 @@ std::unique_ptr createBigintValuesFilter( std::unique_ptr createBigintValues( const std::vector& values, bool nullAllowed) { - return createBigintValuesFilter(values, nullAllowed, false); + return common::createBigintValuesFilter(values, nullAllowed, false); } std::unique_ptr createHugeintValues( @@ -1196,7 +1196,7 @@ std::unique_ptr createHugeintValues( std::unique_ptr createNegatedBigintValues( const std::vector& values, bool nullAllowed) { - return createBigintValuesFilter(values, nullAllowed, true); + return common::createBigintValuesFilter(values, nullAllowed, true); } BigintMultiRange::BigintMultiRange(