Skip to content

Commit

Permalink
Support for dictionary encoded INT96 timestamp in parquet files (face…
Browse files Browse the repository at this point in the history
…bookincubator#4680)

Summary:
Support timestamp reader for Parquet file format to read from dictionary-
encoded INT96 timestamps. Hive configs `kReadTimestampUnit` and
`kReadTimestampUnitSession` are added to control the precision when
reading timestamps from files.
Parquet documentation for INT96:
https://github.com/apache/parquet-format/pull/49/files#diff-0e877db0daf579f98a11e5e113b29250a2dcae3decb1e83a88db1e6f092bee96R149-R157

Pull Request resolved: facebookincubator#4680

Reviewed By: kevinwilfong

Differential Revision: D59883744

Pulled By: Yuhta

fbshipit-source-id: 1893792f7037dcbe216413d311ea95d0ae6bdc9b
  • Loading branch information
rui-mo authored and facebook-github-bot committed Jul 19, 2024
1 parent 524fc39 commit facd967
Show file tree
Hide file tree
Showing 19 changed files with 398 additions and 8 deletions.
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,16 @@ bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

uint8_t HiveConfig::readTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kReadTimestampUnitSession,
config_->get<uint8_t>(kReadTimestampUnit, 3 /*milli*/));
VELOX_CHECK(
unit == 3 || unit == 6 /*micro*/ || unit == 9 /*nano*/,
"Invalid timestamp unit.");
return unit;
}

uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kParquetWriteTimestampUnitSession,
Expand Down
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ class HiveConfig {
static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

// The unit for reading timestamps from files.
static constexpr const char* kReadTimestampUnit =
"hive.reader.timestamp-unit";
static constexpr const char* kReadTimestampUnitSession =
"hive.reader.timestamp_unit";

/// Timestamp unit for Parquet write through Arrow bridge.
static constexpr const char* kParquetWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
Expand Down Expand Up @@ -333,6 +339,9 @@ class HiveConfig {

bool s3UseProxyFromEnv() const;

// Returns the timestamp unit used when reading timestamps from files.
uint8_t readTimestampUnit(const Config* session) const;

/// Returns the timestamp unit used when writing timestamps into Parquet
/// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano.
uint8_t parquetWriteTimestampUnit(const Config* session) const;
Expand Down
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,9 @@ void configureRowReaderOptions(
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit) {
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const Config* sessionProperties) {
auto skipRowsIt =
tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount);
if (skipRowsIt != tableParameters.end()) {
Expand All @@ -582,6 +584,10 @@ void configureRowReaderOptions(
rowReaderOptions.setMetadataFilter(std::move(metadataFilter));
rowReaderOptions.setRequestedType(rowType);
rowReaderOptions.range(hiveSplit->start, hiveSplit->length);
if (hiveConfig && sessionProperties) {
rowReaderOptions.setTimestampPrecision(static_cast<TimestampPrecision>(
hiveConfig->readTimestampUnit(sessionProperties)));
}
}

namespace {
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ void configureRowReaderOptions(
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit);
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig = nullptr,
const Config* sessionProperties = nullptr);

bool testFilters(
const common::ScanSpec* scanSpec,
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ void SplitReader::createReader(
scanSpec_,
std::move(metadataFilter),
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
hiveSplit_,
hiveConfig_,
connectorQueryCtx_->sessionProperties());
}

bool SplitReader::checkIfSplitIsEmpty(
Expand Down
44 changes: 43 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ namespace facebook::velox::parquet {
using thrift::Encoding;
using thrift::PageHeader;

struct __attribute__((__packed__)) Int96Timestamp {
int32_t days;
uint64_t nanos;
};

void PageReader::seekToPage(int64_t row) {
defineDecoder_.reset();
repeatDecoder_.reset();
Expand Down Expand Up @@ -371,6 +376,42 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
int64_t nanos;
memcpy(
&nanos,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(int64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t),
sizeof(int32_t));
values[i] = Timestamp::fromDaysAndNanos(days, nanos);
}
break;
}
case thrift::Type::BYTE_ARRAY: {
dictionary_.values =
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
Expand Down Expand Up @@ -461,7 +502,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
}
case thrift::Type::INT96:
default:
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
Expand All @@ -488,6 +528,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
case thrift::Type::INT64:
case thrift::Type::DOUBLE:
return 8;
case thrift::Type::INT96:
return 12;
default:
VELOX_FAIL("Type does not have a byte width {}", type);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "velox/dwio/parquet/reader/RepeatedColumnReader.h"
#include "velox/dwio/parquet/reader/StringColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -74,6 +75,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
11 changes: 9 additions & 2 deletions velox/dwio/parquet/reader/ParquetData.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,24 @@ class ParquetParams : public dwio::common::FormatParams {
memory::MemoryPool& pool,
dwio::common::ColumnReaderStatistics& stats,
const FileMetaDataPtr metaData,
const date::time_zone* sessionTimezone)
const date::time_zone* sessionTimezone,
TimestampPrecision timestampPrecision)
: FormatParams(pool, stats),
metaData_(metaData),
sessionTimezone_(sessionTimezone) {}
sessionTimezone_(sessionTimezone),
timestampPrecision_(timestampPrecision) {}
std::unique_ptr<dwio::common::FormatData> toFormatData(
const std::shared_ptr<const dwio::common::TypeWithId>& type,
const common::ScanSpec& scanSpec) override;

TimestampPrecision timestampPrecision() const {
return timestampPrecision_;
}

private:
const FileMetaDataPtr metaData_;
const date::time_zone* sessionTimezone_;
const TimestampPrecision timestampPrecision_;
};

/// Format-specific data created for each leaf column of a Parquet rowgroup.
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,8 @@ class ParquetRowReader::Impl {
pool_,
columnReaderStats_,
readerBase_->fileMetaData(),
readerBase->sessionTimezone());
readerBase->sessionTimezone(),
options_.timestampPrecision());
requestedType_ = options_.requestedType() ? options_.requestedType()
: readerBase_->schema();
columnReader_ = ParquetColumnReader::build(
Expand Down
85 changes: 85 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {}

bool hasBulkPath() const override {
return false;
}

void getValues(RowSet rows, VectorPtr* result) override {
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType_);
if (allNull_) {
return;
}

// Adjust timestamp nanos to the requested precision.
VectorPtr resultVector = *result;
auto rawValues =
resultVector->asUnchecked<FlatVector<Timestamp>>()->mutableRawValues();
for (auto i = 0; i < numValues_; ++i) {
if (resultVector->isNullAt(i)) {
continue;
}
const auto timestamp = rawValues[i];
uint64_t nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
}
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
}

private:
// The requested precision can be specified from HiveConfig to read timestamp
// from Parquet.
TimestampPrecision timestampPrecision_;
};

} // namespace facebook::velox::parquet
Binary file not shown.
13 changes: 13 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,19 @@ TEST_F(E2EFilterTest, integerDictionary) {
20);
}

TEST_F(E2EFilterTest, timestampDictionary) {
options_.dataPageSize = 4 * 1024;
options_.writeInt96AsTimestamp = true;

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {},
true,
{"timestamp_val_0", "timestamp_val_1"},
20);
}

TEST_F(E2EFilterTest, floatAndDoubleDirect) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
Expand Down
Loading

0 comments on commit facd967

Please sign in to comment.