Skip to content

Commit

Permalink
Revert "Revert "Allow HiveSplit info columns like '$file_size' and '$…
Browse files Browse the repository at this point in the history
…file_modified_time' to be queried in SQL (facebookincubator#8800)""

This reverts commit d3dc172.
  • Loading branch information
PHILO-HE committed Mar 7, 2024
1 parent d3dc172 commit fbf0636
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 24 deletions.
10 changes: 8 additions & 2 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
std::shared_ptr<std::string> extraFileInfo;
std::unordered_map<std::string, std::string> serdeParameters;

/// These represent columns like $file_size, $file_modified_time that are
/// associated with the HiveSplit.
std::unordered_map<std::string, std::string> infoColumns;

HiveConnectorSplit(
const std::string& connectorId,
const std::string& _filePath,
Expand All @@ -51,7 +55,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
int64_t _splitWeight = 0)
int64_t _splitWeight = 0,
const std::unordered_map<std::string, std::string>& _infoColumns = {})
: ConnectorSplit(connectorId, _splitWeight),
filePath(_filePath),
fileFormat(_fileFormat),
Expand All @@ -61,7 +66,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
tableBucketNumber(_tableBucketNumber),
customSplitInfo(_customSplitInfo),
extraFileInfo(_extraFileInfo),
serdeParameters(_serdeParameters) {}
serdeParameters(_serdeParameters),
infoColumns(_infoColumns) {}

std::string toString() const override {
if (tableBucketNumber.has_value()) {
Expand Down
28 changes: 22 additions & 6 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ inline uint8_t parseDelimiter(const std::string& delim) {
return stoi(delim);
}

inline bool isSynthesizedColumn(
const std::string& name,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns) {
return name == kPath || name == kBucket || infoColumns.count(name) != 0;
}

} // namespace

const std::string& getColumnName(const common::Subfield& subfield) {
Expand Down Expand Up @@ -273,9 +280,13 @@ void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type) {
}
}

void checkColumnNameLowerCase(const SubfieldFilters& filters) {
void checkColumnNameLowerCase(
const SubfieldFilters& filters,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns) {
for (auto& pair : filters) {
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
if (auto name = pair.first.toString();
isSynthesizedColumn(name, infoColumns)) {
continue;
}
auto& path = pair.first.path();
Expand Down Expand Up @@ -310,14 +321,17 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
filterSubfields;
std::vector<SubfieldSpec> subfieldSpecs;
for (auto& [subfield, _] : filters) {
if (auto name = subfield.toString();
name != kPath && name != kBucket && partitionKeys.count(name) == 0) {
!isSynthesizedColumn(name, infoColumns) &&
partitionKeys.count(name) == 0) {
filterSubfields[getColumnName(subfield)].push_back(&subfield);
}
}
Expand Down Expand Up @@ -364,11 +378,13 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
// SelectiveColumnReader doesn't support constant columns with filters,
// hence, we can't have a filter for a $path or $bucket column.
//
// Unfortunately, Presto happens to specify a filter for $path or
// $bucket column. This filter is redundant and needs to be removed.
// Unfortunately, Presto happens to specify a filter for $path, $file_size,
// $file_modified_time or $bucket column. This filter is redundant and needs
// to be removed.
// TODO Remove this check when Presto is fixed to not specify a filter
// on $path and $bucket column.
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
if (auto name = pair.first.toString();
isSynthesizedColumn(name, infoColumns)) {
continue;
}
auto fieldSpec = spec->getOrCreateChild(pair.first);
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ const std::string& getColumnName(const common::Subfield& subfield);

void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type);

void checkColumnNameLowerCase(const SubfieldFilters& filters);
void checkColumnNameLowerCase(
const SubfieldFilters& filters,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns);

void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr);

Expand All @@ -52,6 +55,8 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ HiveDataSource::HiveDataSource(
if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) {
partitionKeys_.emplace(handle->name(), handle);
}

if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) {
infoColumns_.emplace(handle->name(), handle);
}
}

std::vector<std::string> readerRowNames;
Expand Down Expand Up @@ -88,7 +92,7 @@ HiveDataSource::HiveDataSource(
if (hiveConfig_->isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->sessionProperties())) {
checkColumnNameLowerCase(outputType_);
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters());
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters(), infoColumns_);
checkColumnNameLowerCase(hiveTableHandle_->remainingFilter());
}

Expand Down Expand Up @@ -152,6 +156,7 @@ HiveDataSource::HiveDataSource(
filters,
hiveTableHandle_->dataColumns(),
partitionKeys_,
infoColumns_,
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class HiveDataSource : public DataSource {

// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;

// Column handles for the Split info columns keyed on their column names.
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
infoColumns_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
Expand Down
18 changes: 15 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ std::vector<TypePtr> SplitReader::adaptColumns(
auto* childSpec = childrenSpecs[i].get();
const std::string& fieldName = childSpec->fieldName();

auto iter = hiveSplit_->partitionKeys.find(fieldName);
if (iter != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, iter->second);
if (auto it = hiveSplit_->partitionKeys.find(fieldName);
it != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, it->second);
} else if (fieldName == kPath) {
auto constantVec = std::make_shared<ConstantVector<StringView>>(
connectorQueryCtx_->memoryPool(),
Expand All @@ -240,6 +240,18 @@ std::vector<TypePtr> SplitReader::adaptColumns(
std::move(bucket));
childSpec->setConstantValue(constantVec);
}
} else if (auto iter = hiveSplit_->infoColumns.find(fieldName);
iter != hiveSplit_->infoColumns.end()) {
auto infoColumnType =
readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName));
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
infoColumnType->kind(),
infoColumnType,
iter->second,
1,
connectorQueryCtx_->memoryPool());
childSpec->setConstantValue(constant);
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
Expand Down
18 changes: 14 additions & 4 deletions velox/connectors/hive/iceberg/IcebergSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo)
const std::shared_ptr<std::string>& _extraFileInfo,
const std::unordered_map<std::string, std::string>& _infoColumns)
: HiveConnectorSplit(
_connectorId,
_filePath,
_fileFormat,
_start,
_length,
_partitionKeys,
_tableBucketNumber) {
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo,
{},
0,
_infoColumns) {
// TODO: Deserialize _extraFileInfo to get deleteFiles;
}

Expand All @@ -54,7 +60,8 @@ HiveIcebergSplit::HiveIcebergSplit(
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo,
std::vector<IcebergDeleteFile> _deletes)
std::vector<IcebergDeleteFile> _deletes,
const std::unordered_map<std::string, std::string>& _infoColumns)
: HiveConnectorSplit(
_connectorId,
_filePath,
Expand All @@ -64,6 +71,9 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo),
_extraFileInfo,
{},
0,
_infoColumns),
deleteFiles(_deletes) {}
} // namespace facebook::velox::connector::hive::iceberg
6 changes: 4 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {});
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _infoColumns = {});

// For tests only
HiveIcebergSplit(
Expand All @@ -50,7 +51,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
std::vector<IcebergDeleteFile> deletes = {});
std::vector<IcebergDeleteFile> deletes = {},
const std::unordered_map<std::string, std::string>& _infoColumns = {});
};

} // namespace facebook::velox::connector::hive::iceberg
17 changes: 14 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) {
auto rowType = ROW({{"c0", columnType}});
auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"});
auto scanSpec = makeScanSpec(
rowType, groupSubfields(subfields), {}, nullptr, {}, pool_.get());
rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
validateNullConstant(*c0c0, *BIGINT());
auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1");
Expand Down Expand Up @@ -122,6 +122,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant());
Expand All @@ -144,6 +145,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->maxArrayElementsCount(), 2);
Expand All @@ -160,7 +162,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) {
auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"});
auto groupedSubfields = groupSubfields(subfields);
VELOX_ASSERT_USER_THROW(
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, pool_.get()),
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()),
"Non-positive array subscript cannot be push down");
}

Expand All @@ -175,6 +177,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
auto* keysFilter = c0->childByName(ScanSpec::kMapKeysFieldName)->filter();
Expand All @@ -200,6 +203,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -218,6 +222,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -240,6 +245,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
auto* keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -267,6 +273,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -285,6 +292,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -300,6 +308,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -335,6 +344,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) {
filters,
ROW({{"c0", c0Type}, {"c1", c1Type}}),
{},
{},
pool_.get());
auto c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->isConstant());
Expand Down Expand Up @@ -379,6 +389,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->children().size(), 2);
Expand All @@ -392,7 +403,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) {
SubfieldFilters filters;
filters.emplace(Subfield("ds"), exec::equal("2023-10-13"));
auto scanSpec = makeScanSpec(
rowType, {}, filters, rowType, {{"ds", nullptr}}, pool_.get());
rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get());
ASSERT_TRUE(scanSpec->childByName("c0")->projectOut());
ASSERT_FALSE(scanSpec->childByName("ds")->projectOut());
}
Expand Down
Loading

0 comments on commit fbf0636

Please sign in to comment.