Skip to content

Commit

Permalink
Support reading Iceberg split with equality deletes
Browse files Browse the repository at this point in the history
This commit introduces EqualityDeleteFileReader, which is used to read
Iceberg splits with equality delete files.

Co-authored-by: Naveen Kumar Mahadevuni <[email protected]>
  • Loading branch information
yingsu00 and Naveen Kumar Mahadevuni committed Sep 5, 2024
1 parent 94dcf02 commit 42d1bb6
Show file tree
Hide file tree
Showing 21 changed files with 1,053 additions and 76 deletions.
6 changes: 3 additions & 3 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
#pragma once

#include "folly/CancellationToken.h"
#include <folly/CancellationToken.h>
#include <folly/Synchronized.h>

#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/PrefixSortConfig.h"
#include "velox/common/base/RuntimeMetrics.h"
Expand All @@ -27,8 +29,6 @@
#include "velox/core/ExpressionEvaluator.h"
#include "velox/vector/ComplexVector.h"

#include <folly/Synchronized.h>

namespace facebook::velox {
class Config;
}
Expand Down
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
ioStats_,
fileHandleFactory_,
executor_,
scanSpec_);
scanSpec_,
remainingFilterExprSet_,
expressionEvaluator_,
totalRemainingFilterTime_);
}

std::unique_ptr<HivePartitionFunction> HiveDataSource::setupBucketConversion() {
Expand Down Expand Up @@ -283,8 +286,10 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
}

splitReader_ = createSplitReader();

// Split reader subclasses may need to use the reader options in prepareSplit
// so we initialize it beforehand.

splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_);
}
Expand Down Expand Up @@ -547,6 +552,7 @@ std::shared_ptr<wave::WaveDataSource> HiveDataSource::toWaveDataSource() {
void HiveDataSource::registerWaveDelegateHook(WaveDelegateHookFunction hook) {
waveDelegateHook_ = hook;
}

std::shared_ptr<wave::WaveDataSource> toWaveDataSource();

} // namespace facebook::velox::connector::hive
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class HiveDataSource : public DataSource {
subfields_;
SubfieldFilters filters_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
Expand Down
15 changes: 13 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ std::unique_ptr<SplitReader> SplitReader::create(
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec) {
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime) {
// Create the SplitReader based on hiveSplit->customSplitInfo["table_format"]
if (hiveSplit->customSplitInfo.count("table_format") > 0 &&
hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") {
Expand All @@ -88,7 +91,10 @@ std::unique_ptr<SplitReader> SplitReader::create(
ioStats,
fileHandleFactory,
executor,
scanSpec);
scanSpec,
remainingFilterExprSet,
expressionEvaluator,
totalRemainingFilterTime);
} else {
return std::make_unique<SplitReader>(
hiveSplit,
Expand Down Expand Up @@ -179,6 +185,11 @@ void SplitReader::resetSplit() {
hiveSplit_.reset();
}

std::shared_ptr<const dwio::common::TypeWithId> SplitReader::baseFileSchema() {
VELOX_CHECK_NOT_NULL(baseReader_.get());
return baseReader_->typeWithId();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class SplitReader {
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec);
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime);

SplitReader(
const std::shared_ptr<const hive::HiveConnectorSplit>& hiveSplit,
Expand Down Expand Up @@ -102,6 +105,8 @@ class SplitReader {

void resetSplit();

std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema();

int64_t estimatedRowSize() const;

void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const;
Expand Down
6 changes: 4 additions & 2 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp
IcebergSplit.cpp PositionalDeleteFileReader.cpp)
add_library(
velox_hive_iceberg_splitreader
IcebergSplitReader.cpp IcebergSplit.cpp PositionalDeleteFileReader.cpp
EqualityDeleteFileReader.cpp FilterUtil.cpp)

velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
Folly::folly)
Expand Down
221 changes: 221 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"

#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/iceberg/FilterUtil.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/dwio/common/TypeUtils.h"

using namespace facebook::velox::common;
using namespace facebook::velox::core;
using namespace facebook::velox::exec;

namespace facebook::velox::connector::hive::iceberg {

static constexpr const int kMaxBatchRows = 10'000;

EqualityDeleteFileReader::EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<const HiveConfig> hiveConfig,
std::shared_ptr<io::IoStatistics> ioStats,
dwio::common::RuntimeStatistics& runtimeStats,
const std::string& connectorId)
: deleteFile_(deleteFile),
baseFileSchema_(baseFileSchema),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
deleteSplit_(nullptr),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
pool_(connectorQueryCtx->memoryPool()),
ioStats_(ioStats),
deleteRowReader_(nullptr) {
VELOX_CHECK(deleteFile_.content == FileContent::kEqualityDeletes);

if (deleteFile_.recordCount == 0) {
return;
}

std::unordered_set<int32_t> equalityFieldIds(
deleteFile_.equalityFieldIds.begin(), deleteFile_.equalityFieldIds.end());
auto deleteFieldSelector = [&equalityFieldIds](size_t index) {
return equalityFieldIds.find(static_cast<int32_t>(index)) !=
equalityFieldIds.end();
};
auto deleteFileSchema = dwio::common::typeutils::buildSelectedType(
baseFileSchema_, deleteFieldSelector);

rowType_ = std::static_pointer_cast<const RowType>(deleteFileSchema->type());

// TODO: push down filter if previous delete file contains this one. E.g.
// previous equality delete file has a=1, and this file also contains
// columns a, then a!=1 can be pushed as a filter when reading this delete
// file.

auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
scanSpec->addAllChildFields(rowType_->asRow());

deleteSplit_ = std::make_shared<HiveConnectorSplit>(
connectorId,
deleteFile_.filePath,
deleteFile_.fileFormat,
0,
deleteFile_.fileSizeInBytes);

// Create the Reader and RowReader

dwio::common::ReaderOptions deleteReaderOpts(pool_);
configureReaderOptions(
deleteReaderOpts,
hiveConfig_,
connectorQueryCtx_,
rowType_,
deleteSplit_);

auto deleteFileHandleCachePtr =
fileHandleFactory_->generate(deleteFile_.filePath);
auto deleteFileInput = createBufferedInput(
*deleteFileHandleCachePtr,
deleteReaderOpts,
connectorQueryCtx_,
ioStats_,
executor_);

auto deleteReader =
dwio::common::getReaderFactory(deleteReaderOpts.fileFormat())
->createReader(std::move(deleteFileInput), deleteReaderOpts);

dwio::common::RowReaderOptions deleteRowReaderOpts;
configureRowReaderOptions(
deleteRowReaderOpts, {}, scanSpec, nullptr, rowType_, deleteSplit_);

deleteRowReader_.reset();
deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts);
}

void EqualityDeleteFileReader::readDeleteValues(
SubfieldFilters& subfieldFilters,
std::vector<core::TypedExprPtr>& expressionInputs) {
VELOX_CHECK(deleteRowReader_);
VELOX_CHECK(deleteSplit_);

if (!deleteValuesOutput_) {
deleteValuesOutput_ = BaseVector::create(rowType_, 0, pool_);
}

// TODO:: verfiy if the field is a sub-field. Velox currently doesn't support
// pushing down filters to sub-fields
if (rowType_->size() == 1) {
// Construct the IN list filter that can be pushed down to the base file
// readers, then update the baseFileScanSpec.
readSingleColumnDeleteValues(subfieldFilters);
} else {
readMultipleColumnDeleteValues(expressionInputs);
}

deleteSplit_.reset();
}

void EqualityDeleteFileReader::readSingleColumnDeleteValues(
SubfieldFilters& subfieldFilters) {
std::unique_ptr<Filter> filter = std::make_unique<AlwaysTrue>();
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto vector =
std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_)->childAt(0);
auto name = rowType_->nameOf(0);

auto typeKind = vector->type()->kind();
VELOX_CHECK(
typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL,
"Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}",
name,
typeKind);

auto notExistsFilter =
createNotExistsFilter(vector, 0, deleteValuesOutput_->size(), typeKind);
filter = filter->mergeWith(notExistsFilter.get());
}

if (filter->kind() != FilterKind::kAlwaysTrue) {
if (subfieldFilters.find(common::Subfield(rowType_->nameOf(0))) !=
subfieldFilters.end()) {
subfieldFilters[common::Subfield(rowType_->nameOf(0))] =
subfieldFilters[common::Subfield(rowType_->nameOf(0))]->mergeWith(
filter.get());
} else {
subfieldFilters[common::Subfield(rowType_->nameOf(0))] =
std::move(filter);
}
}
}

void EqualityDeleteFileReader::readMultipleColumnDeleteValues(
std::vector<core::TypedExprPtr>& expressionInputs) {
auto numDeleteFields = rowType_->size();
VELOX_CHECK_GT(
numDeleteFields,
0,
"Iceberg equality delete file should have at least one field.");

// TODO: logical expression simplifications
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto rowVector = std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_);
auto numDeletedValues = rowVector->childAt(0)->size();

for (int i = 0; i < numDeletedValues; i++) {
std::vector<core::TypedExprPtr> disconjunctInputs;

for (int j = 0; j < numDeleteFields; j++) {
auto type = rowType_->childAt(j);
auto name = rowType_->nameOf(j);
auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j));

std::vector<core::TypedExprPtr> isNotEqualInputs;
isNotEqualInputs.push_back(
std::make_shared<FieldAccessTypedExpr>(type, name));
isNotEqualInputs.push_back(std::make_shared<ConstantTypedExpr>(value));
auto isNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), isNotEqualInputs, "neq");

disconjunctInputs.push_back(isNotEqualExpr);
}

auto disconjunctNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), disconjunctInputs, "or");
expressionInputs.push_back(disconjunctNotEqualExpr);
}
}
}

} // namespace facebook::velox::connector::hive::iceberg

Loading

0 comments on commit 42d1bb6

Please sign in to comment.