Skip to content

Commit

Permalink
Allow '$file_size' and '$file_modified_time' for HiveSplits as column…
Browse files Browse the repository at this point in the history
…s in a query
  • Loading branch information
aditi-pandit committed Feb 19, 2024
1 parent 1649035 commit b7be310
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 16 deletions.
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace facebook::velox::connector::hive {
struct HiveConnectorSplit : public connector::ConnectorSplit {
const std::string filePath;
dwio::common::FileFormat fileFormat;
const int64_t fileSize;
const int64_t fileModifiedTime;
const uint64_t start;
const uint64_t length;

Expand All @@ -50,10 +52,14 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _serdeParameters = {})
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
int64_t _fileSize = 0,
int64_t _fileModifiedTime = 0)
: ConnectorSplit(connectorId),
filePath(_filePath),
fileFormat(_fileFormat),
fileSize(_fileSize),
fileModifiedTime(_fileModifiedTime),
start(_start),
length(_length),
partitionKeys(_partitionKeys),
Expand Down
16 changes: 11 additions & 5 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ inline uint8_t parseDelimiter(const std::string& delim) {
return stoi(delim);
}

inline bool isSynthesizedColumn(const std::string& name) {
return name == kPath || name == kBucket || name == kFileSize ||
name == kFileModifiedTime;
}

} // namespace

const std::string& getColumnName(const common::Subfield& subfield) {
Expand Down Expand Up @@ -273,7 +278,7 @@ void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type) {

void checkColumnNameLowerCase(const SubfieldFilters& filters) {
for (auto& pair : filters) {
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
if (auto name = pair.first.toString(); isSynthesizedColumn(name)) {
continue;
}
auto& path = pair.first.path();
Expand Down Expand Up @@ -315,7 +320,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
std::vector<SubfieldSpec> subfieldSpecs;
for (auto& [subfield, _] : filters) {
if (auto name = subfield.toString();
name != kPath && name != kBucket && partitionKeys.count(name) == 0) {
!isSynthesizedColumn(name) && partitionKeys.count(name) == 0) {
filterSubfields[getColumnName(subfield)].push_back(&subfield);
}
}
Expand Down Expand Up @@ -362,11 +367,12 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
// SelectiveColumnReader doesn't support constant columns with filters,
// hence, we can't have a filter for a $path or $bucket column.
//
// Unfortunately, Presto happens to specify a filter for $path or
// $bucket column. This filter is redundant and needs to be removed.
// Unfortunately, Presto happens to specify a filter for $path, $file_size,
// $file_modified_time or $bucket column. This filter is redundant and needs
// to be removed.
// TODO Remove this check when Presto is fixed to not specify a filter
// on $path and $bucket column.
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
if (auto name = pair.first.toString(); isSynthesizedColumn(name)) {
continue;
}
auto fieldSpec = spec->getOrCreateChild(pair.first);
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ using SubfieldFilters =

constexpr const char* kPath = "$path";
constexpr const char* kBucket = "$bucket";
constexpr const char* kFileSize = "$file_size";
constexpr const char* kFileModifiedTime = "$file_modified_time";

const std::string& getColumnName(const common::Subfield& subfield);

Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ std::vector<TypePtr> SplitReader::adaptColumns(
INTEGER(),
velox::variant(hiveSplit_->tableBucketNumber.value()));
}
} else if (fieldName == kFileSize) {
setConstantValue(
childSpec, BIGINT(), velox::variant(hiveSplit_->fileSize));
} else if (fieldName == kFileModifiedTime) {
setConstantValue(
childSpec, BIGINT(), velox::variant(hiveSplit_->fileModifiedTime));
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
Expand Down
20 changes: 16 additions & 4 deletions velox/connectors/hive/iceberg/IcebergSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,22 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo)
const std::shared_ptr<std::string>& _extraFileInfo,
int64_t _fileSize,
int64_t _fileModifiedTime)
: HiveConnectorSplit(
_connectorId,
_filePath,
_fileFormat,
_start,
_length,
_partitionKeys,
_tableBucketNumber) {
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo,
{},
_fileSize,
_fileModifiedTime) {
// TODO: Deserialize _extraFileInfo to get deleteFiles;
}

Expand All @@ -54,7 +61,9 @@ HiveIcebergSplit::HiveIcebergSplit(
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo,
std::vector<IcebergDeleteFile> _deletes)
std::vector<IcebergDeleteFile> _deletes,
int64_t _fileSize,
int64_t _fileModifiedTime)
: HiveConnectorSplit(
_connectorId,
_filePath,
Expand All @@ -64,6 +73,9 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo),
_extraFileInfo,
{},
_fileSize,
_fileModifiedTime),
deleteFiles(_deletes) {}
} // namespace facebook::velox::connector::hive::iceberg
8 changes: 6 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {});
const std::shared_ptr<std::string>& _extraFileInfo = {},
int64_t _fileSize = 0,
int64_t _fileModifiedTime = 0);

// For tests only
HiveIcebergSplit(
Expand All @@ -50,7 +52,9 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
std::vector<IcebergDeleteFile> deletes = {});
std::vector<IcebergDeleteFile> deletes = {},
int64_t _fileSize = 0,
int64_t _fileModifiedTime = 0);
};

} // namespace facebook::velox::connector::hive::iceberg
4 changes: 3 additions & 1 deletion velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ class HiveIcebergTest : public HiveConnectorTestBase {
std::nullopt,
customSplitInfo,
nullptr,
deleteFiles);
deleteFiles,
fileSize,
0);
}

std::vector<RowVectorPtr> makeVectors(int32_t count, int32_t rowsPerVector) {
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/tests/HiveConnectorUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) {
std::nullopt,
customSplitInfo,
nullptr,
serdeParameters);
serdeParameters,
65535,
1342143134);
};

auto performConfigure = [&]() {
Expand Down
70 changes: 70 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,76 @@ TEST_F(TableScanTest, path) {
op, {filePath}, fmt::format("SELECT '{}', * FROM tmp", pathValue));
}

TEST_F(TableScanTest, fileSizeAndModifiedTime) {
auto rowType = ROW({"a"}, {BIGINT()});
auto filePath = makeFilePaths(1)[0];
auto vector = makeVectors(1, 10, rowType)[0];
writeToFile(filePath->path, vector);
createDuckDbTable({vector});

static const char* kSize = "$file_size";
static const char* kModifiedTime = "$file_modified_time";

auto allColumns =
ROW({"a", kSize, kModifiedTime}, {BIGINT(), BIGINT(), BIGINT()});

auto assignments = allRegularColumns(rowType);
assignments[kSize] = synthesizedColumn(kSize, BIGINT());
assignments[kModifiedTime] = synthesizedColumn(kModifiedTime, BIGINT());

auto fileSizeValue = fmt::format("{}", filePath->fileSize());
auto fileTimeValue = fmt::format("{}", filePath->fileModifiedTime());

// Select and project both '$file_size', '$file_modified_time'.
auto op = PlanBuilder()
.startTableScan()
.outputType(allColumns)
.dataColumns(allColumns)
.assignments(assignments)
.endTableScan()
.planNode();
assertQuery(
op,
{filePath},
fmt::format("SELECT *, {}, {} FROM tmp", fileSizeValue, fileTimeValue));

auto filterTest = [&](const std::string& filter) {
auto tableHandle = makeTableHandle(
SubfieldFilters{},
parseExpr(filter, allColumns),
"hive_table",
allColumns);

// Use synthesized column in a filter but don't project it.
op = PlanBuilder()
.startTableScan()
.outputType(rowType)
.dataColumns(allColumns)
.tableHandle(tableHandle)
.assignments(assignments)
.endTableScan()
.planNode();
assertQuery(op, {filePath}, "SELECT * FROM tmp");

// Use synthesized column in a filter and project it out.
op = PlanBuilder()
.startTableScan()
.outputType(allColumns)
.dataColumns(allColumns)
.tableHandle(tableHandle)
.assignments(assignments)
.endTableScan()
.planNode();
assertQuery(
op,
{filePath},
fmt::format("SELECT *, {}, {} FROM tmp", fileSizeValue, fileTimeValue));
};

filterTest(fmt::format("\"{}\" = {}", kSize, fileSizeValue));
filterTest(fmt::format("\"{}\" = {}", kModifiedTime, fileTimeValue));
}

TEST_F(TableScanTest, bucket) {
vector_size_t size = 1'000;
int numBatches = 5;
Expand Down
23 changes: 22 additions & 1 deletion velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ HiveConnectorTestBase::makeHiveConnectorSplits(
.fileFormat(format)
.start(i * splitSize)
.length(splitSize)
.fileSize(fileSize)
.build();
splits.push_back(std::move(split));
}
Expand Down Expand Up @@ -171,7 +172,12 @@ HiveConnectorTestBase::makeHiveConnectorSplits(
const std::vector<std::shared_ptr<TempFilePath>>& filePaths) {
std::vector<std::shared_ptr<connector::ConnectorSplit>> splits;
for (auto filePath : filePaths) {
splits.push_back(makeHiveConnectorSplit(filePath->path));
splits.push_back(makeHiveConnectorSplit(
filePath->path,
filePath->fileSize(),
filePath->fileModifiedTime(),
0,
std::numeric_limits<uint64_t>::max()));
}
return splits;
}
Expand All @@ -187,6 +193,21 @@ HiveConnectorTestBase::makeHiveConnectorSplit(
.build();
}

std::shared_ptr<connector::ConnectorSplit>
HiveConnectorTestBase::makeHiveConnectorSplit(
const std::string& filePath,
int64_t fileSize,
int64_t fileModifiedTime,
uint64_t start,
uint64_t length) {
return HiveConnectorSplitBuilder(filePath)
.fileSize(fileSize)
.fileModifiedTime(fileModifiedTime)
.start(start)
.length(length)
.build();
}

// static
std::shared_ptr<connector::hive::HiveInsertTableHandle>
HiveConnectorTestBase::makeHiveInsertTableHandle(
Expand Down
47 changes: 46 additions & 1 deletion velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ class HiveConnectorTestBase : public OperatorTestBase {
uint64_t start = 0,
uint64_t length = std::numeric_limits<uint64_t>::max());

static std::shared_ptr<connector::ConnectorSplit> makeHiveConnectorSplit(
const std::string& filePath,
int64_t fileSize,
int64_t fileModifiedTime,
uint64_t start,
uint64_t length);

/// Split file at path 'filePath' into 'splitCount' splits. If not local file,
/// file size can be given as 'externalSize'.
static std::vector<std::shared_ptr<connector::hive::HiveConnectorSplit>>
Expand Down Expand Up @@ -202,6 +209,16 @@ class HiveConnectorSplitBuilder {
return *this;
}

HiveConnectorSplitBuilder& fileSize(int64_t fileSize) {
fileSize_ = fileSize;
return *this;
}

HiveConnectorSplitBuilder& fileModifiedTime(int64_t fileModifiedTime) {
fileModifiedTime_ = fileModifiedTime;
return *this;
}

HiveConnectorSplitBuilder& partitionKey(
std::string name,
std::optional<std::string> value) {
Expand All @@ -214,6 +231,24 @@ class HiveConnectorSplitBuilder {
return *this;
}

HiveConnectorSplitBuilder& customSplitInfo(
const std::unordered_map<std::string, std::string>& customSplitInfo) {
customSplitInfo_ = customSplitInfo;
return *this;
}

HiveConnectorSplitBuilder& extraFileInfo(
const std::shared_ptr<std::string>& extraFileInfo) {
extraFileInfo_ = extraFileInfo;
return *this;
}

HiveConnectorSplitBuilder& serdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters) {
serdeParameters_ = serdeParameters;
return *this;
}

HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) {
connectorId_ = connectorId;
return *this;
Expand All @@ -227,16 +262,26 @@ class HiveConnectorSplitBuilder {
start_,
length_,
partitionKeys_,
tableBucketNumber_);
tableBucketNumber_,
customSplitInfo_,
extraFileInfo_,
serdeParameters_,
fileSize_,
fileModifiedTime_);
}

private:
const std::string filePath_;
dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF};
uint64_t start_{0};
uint64_t length_{std::numeric_limits<uint64_t>::max()};
int64_t fileSize_;
int64_t fileModifiedTime_;
std::unordered_map<std::string, std::optional<std::string>> partitionKeys_;
std::optional<int32_t> tableBucketNumber_;
std::unordered_map<std::string, std::string> customSplitInfo_ = {};
std::shared_ptr<std::string> extraFileInfo_ = {};
std::unordered_map<std::string, std::string> serdeParameters_ = {};
std::string connectorId_ = kHiveConnectorId;
};

Expand Down
Loading

0 comments on commit b7be310

Please sign in to comment.