Skip to content

Commit

Permalink
Support for INT64 timestamp in parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
mskapilks committed Jan 10, 2024
1 parent f15e096 commit 82d5925
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 5 deletions.
3 changes: 3 additions & 0 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ void SelectiveColumnReader::getIntValues(
VELOX_FAIL("Unsupported value size: {}", valueSize_);
}
break;
case TypeKind::TIMESTAMP:
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
break;
default:
VELOX_FAIL(
"Not a valid type for integer reader: {}", requestedType->toString());
Expand Down
93 changes: 93 additions & 0 deletions velox/dwio/common/TimestampDecoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/common/DirectDecoder.h"

namespace facebook::velox::dwio::common {

enum TIMESTAMP_PRECISION { MILLIS, MICROS };

class TimestampDecoder : public DirectDecoder<false> {
public:
TimestampDecoder(
TIMESTAMP_PRECISION precision,
std::unique_ptr<dwio::common::SeekableInputStream> input,
bool useVInts,
uint32_t numBytes,
bool bigEndian = false)
: DirectDecoder<false>{std::move(input), useVInts, numBytes, bigEndian},
_precision(precision) {}

template <bool hasNulls, typename Visitor>
void readWithVisitor(
const uint64_t* FOLLY_NULLABLE nulls,
Visitor visitor,
bool useFastPath = true) {
int32_t current = visitor.start();
skip<hasNulls>(current, 0, nulls);
const bool allowNulls = hasNulls && visitor.allowNulls();
for (;;) {
bool atEnd = false;
int32_t toSkip;
if (hasNulls) {
if (!allowNulls) {
toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd);
if (!Visitor::dense) {
skip<false>(toSkip, current, nullptr);
}
if (atEnd) {
return;
}
} else {
if (bits::isBitNull(nulls, current)) {
toSkip = visitor.processNull(atEnd);
goto skip;
}
}
}
if constexpr (std::is_same_v<typename Visitor::DataType, int128_t>) {
auto units = IntDecoder<false>::template readInt<int64_t>();
Timestamp timestap;
if (_precision == TIMESTAMP_PRECISION::MILLIS) {
timestap = facebook::velox::util::fromUTCMillisParquet(units);
} else {
timestap = facebook::velox::util::fromUTCMicrosParquet(units);
}

toSkip =
visitor.process(*reinterpret_cast<int128_t*>(&timestap), atEnd);
} else {
toSkip = visitor.process(
IntDecoder<false>::template readInt<int64_t>(), atEnd);
}
skip:
++current;
if (toSkip) {
skip<hasNulls>(toSkip, current, nulls);
current += toSkip;
}
if (atEnd) {
return;
}
}
}

private:
TIMESTAMP_PRECISION _precision;
};
} // namespace facebook::velox::dwio::common
72 changes: 72 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/dwio/common/BufferUtil.h"
#include "velox/dwio/common/ColumnVisitors.h"
#include "velox/dwio/common/TimestampDecoder.h"
#include "velox/dwio/parquet/thrift/ThriftTransport.h"
#include "velox/vector/FlatVector.h"

Expand Down Expand Up @@ -393,6 +394,32 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
// We start from the end to allow in-place expansion.
values[i] = parquetValues[i];
}
} else if (
parquetType == thrift::Type::INT64 &&
type_->logicalType_.has_value()) {
auto logicalType = type_->logicalType_.value();
if (logicalType.__isset.TIMESTAMP) {
VELOX_CHECK(
logicalType.TIMESTAMP.isAdjustedToUTC,
"Only UTC adjusted Timestamp is supported");
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();
int64_t units;

if (logicalType.TIMESTAMP.unit.__isset.MICROS) {
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
memcpy(&units, parquetValues + i * typeSize, typeSize);
values[i] = facebook::velox::util::fromUTCMicrosParquet(units);
}
} else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) {
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
memcpy(&units, parquetValues + i * typeSize, typeSize);
values[i] = facebook::velox::util::fromUTCMillisParquet(units);
}
} else {
VELOX_NYI("Unsupported timestamp unit");
}
}
}
break;
}
Expand Down Expand Up @@ -656,6 +683,49 @@ void PageReader::makeDecoder() {
type_->typeLength_,
true);
break;
case thrift::Type::INT64:
if (type_->logicalType_.has_value()) {
auto logicalType = type_->logicalType_.value();
if (logicalType.__isset.TIMESTAMP) {
VELOX_CHECK(
logicalType.TIMESTAMP.isAdjustedToUTC,
"Only UTC adjusted Timestamp is supported");
if (logicalType.TIMESTAMP.unit.__isset.MICROS) {
timestampDecoder_ = std::make_unique<
dwio::common::TimestampDecoder>(
dwio::common::TIMESTAMP_PRECISION::MICROS,
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
} else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) {
timestampDecoder_ = std::make_unique<
dwio::common::TimestampDecoder>(
dwio::common::TIMESTAMP_PRECISION::MILLIS,
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
} else {
VELOX_NYI("Timestamp unit not supported");
}
} else {
directDecoder_ =
std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
}
} else {
directDecoder_ =
std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
}
break;
default: {
directDecoder_ = std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
Expand Down Expand Up @@ -708,6 +778,8 @@ void PageReader::skip(int64_t numRows) {
stringDecoder_->skip(toSkip);
} else if (booleanDecoder_) {
booleanDecoder_->skip(toSkip);
} else if (timestampDecoder_) {
timestampDecoder_->skip(toSkip);
} else if (deltaBpDecoder_) {
deltaBpDecoder_->skip(toSkip);
} else {
Expand Down
20 changes: 16 additions & 4 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/dwio/common/BitConcatenation.h"
#include "velox/dwio/common/DirectDecoder.h"
#include "velox/dwio/common/SelectiveColumnReader.h"
#include "velox/dwio/common/TimestampDecoder.h"
#include "velox/dwio/common/compression/Compression.h"
#include "velox/dwio/parquet/reader/BooleanDecoder.h"
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
Expand Down Expand Up @@ -273,8 +274,13 @@ class PageReader {
nullsFromFastPath = false;
deltaBpDecoder_->readWithVisitor<true>(nulls, visitor);
} else {
directDecoder_->readWithVisitor<true>(
nulls, visitor, nullsFromFastPath);
if (directDecoder_) {
directDecoder_->readWithVisitor<true>(
nulls, visitor, nullsFromFastPath);
} else {
timestampDecoder_->readWithVisitor<true>(
nulls, visitor, nullsFromFastPath);
}
}
} else {
if (isDictionary()) {
Expand All @@ -283,8 +289,13 @@ class PageReader {
} else if (encoding_ == thrift::Encoding::DELTA_BINARY_PACKED) {
deltaBpDecoder_->readWithVisitor<false>(nulls, visitor);
} else {
directDecoder_->readWithVisitor<false>(
nulls, visitor, !this->type_->type()->isShortDecimal());
if (directDecoder_) {
directDecoder_->readWithVisitor<false>(
nulls, visitor, !this->type_->type()->isShortDecimal());
} else {
timestampDecoder_->readWithVisitor<false>(
nulls, visitor, nullsFromFastPath);
}
}
}
}
Expand Down Expand Up @@ -487,6 +498,7 @@ class PageReader {
std::unique_ptr<StringDecoder> stringDecoder_;
std::unique_ptr<BooleanDecoder> booleanDecoder_;
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
std::unique_ptr<dwio::common::TimestampDecoder> timestampDecoder_;
// Add decoders for other encodings here.
};

Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/dwio/parquet/reader/Statistics.h"
#include "velox/dwio/parquet/reader/StringColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {
Expand Down Expand Up @@ -76,6 +77,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
32 changes: 31 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,40 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
int32_t type_length =
schemaElement.__isset.type_length ? schemaElement.type_length : 0;
std::vector<std::shared_ptr<const dwio::common::TypeWithId>> children;
const std::optional<thrift::LogicalType> logicalType_ =
std::optional<thrift::LogicalType> logicalType_ =
schemaElement.__isset.logicalType
? std::optional<thrift::LogicalType>(schemaElement.logicalType)
: std::nullopt;

if (veloxType->kind() == TypeKind::TIMESTAMP &&
schemaElement.type == thrift::Type::INT64 &&
!logicalType_.has_value()) {
// Construct logical type from deprecated converted type of Parquet
thrift::TimestampType timestamp;
timestamp.__set_isAdjustedToUTC(true);
thrift::TimeUnit unit;

if (schemaElement.converted_type ==
thrift::ConvertedType::TIMESTAMP_MICROS) {
thrift::MicroSeconds micros;
unit.__set_MICROS(micros);
} else if (
schemaElement.converted_type ==
thrift::ConvertedType::TIMESTAMP_MILLIS) {
thrift::MilliSeconds millis;
unit.__set_MILLIS(millis);
} else {
VELOX_NYI(
"{} Timestamp unit not supported", schemaElement.converted_type);
}

timestamp.__set_unit(unit);
thrift::LogicalType newLogicalType;
newLogicalType.__set_TIMESTAMP(timestamp);

logicalType_ = std::optional<thrift::LogicalType>(newLogicalType);
}

std::shared_ptr<const ParquetTypeWithId> leafTypePtr =
std::make_shared<const ParquetTypeWithId>(
veloxType,
Expand Down
49 changes: 49 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec) {}

bool hasBulkPath() const override {
return false;
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader>(rows);
readOffset_ += rows.back() + 1;
}
};

} // namespace facebook::velox::parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 82d5925

Please sign in to comment.