Skip to content

Commit

Permalink
feat(Parquet): Add Int64 Timestamp support in reader
Browse files Browse the repository at this point in the history
  • Loading branch information
zuyu committed Dec 3, 2024
1 parent a0bbea2 commit 43dc8a9
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 52 deletions.
20 changes: 12 additions & 8 deletions velox/dwio/common/IntDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,20 @@ inline T IntDecoder<isSigned>::readInt() {
}
if (bigEndian_) {
return readLittleEndianFromBigEndian<T>();
} else {
if constexpr (std::is_same_v<T, int128_t>) {
if (numBytes_ == 12) {
VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded.");
return readInt96();
}
VELOX_NYI();
}

if constexpr (std::is_same_v<T, int128_t>) {
if (numBytes_ == 8) {
return readLongLE();
}

if (numBytes_ == 12) {
VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded.");
return readInt96();
}
return readLongLE();
VELOX_NYI();
}
return readLongLE();
}

template <bool isSigned>
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,11 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT32:
return INTEGER();
case thrift::Type::type::INT64:
// For Int64 Timestamp in nano precision
if (schemaElement.__isset.logicalType &&
schemaElement.logicalType.__isset.TIMESTAMP) {
return TIMESTAMP();
}
return BIGINT();
case thrift::Type::type::INT96:
return TIMESTAMP(); // INT96 only maps to a timestamp
Expand Down
190 changes: 146 additions & 44 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,53 @@

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {
namespace {

Timestamp toInt64Timestamp(const int64_t value, const thrift::TimeUnit& unit) {
int64_t seconds = value;
uint64_t nanos = 0ULL;
if (unit.__isset.MILLIS) {
seconds /= 1'000;
nanos = (value - seconds * 1'000) * 1'000'000;
} else if (unit.__isset.MICROS) {
seconds /= 1'000'000;
nanos = (value - seconds * 1'000'000) * 1'000;
} else if (unit.__isset.NANOS) {
seconds /= 1'000'000'000;
nanos = value - seconds * 1'000'000'000;
} else {
VELOX_UNREACHABLE();
}
return {seconds, nanos};
}

// Range filter for Parquet Int64 Timestamp.
class ParquetInt64TimestampRange final : public common::TimestampRange {
public:
// @param lower Lower end of the range, inclusive.
// @param upper Upper end of the range, inclusive.
// @param nullAllowed Null values are passing the filter if true.
// @param timestampPrecision Precision of the Timestamp.
ParquetInt64TimestampRange(
const Timestamp& lower,
const Timestamp& upper,
bool nullAllowed,
const thrift::TimeUnit& timestampUnit)
: TimestampRange(lower, upper, nullAllowed),
timestampUnit_(timestampUnit) {}

// Int64 is read as int128_t value
bool testInt128(int128_t value) const override {
const auto ts = toInt64Timestamp(value, timestampUnit_);
return ts >= this->lower() && ts <= this->upper();
}

const thrift::TimeUnit timestampUnit_;
};

// Range filter for Parquet Int96 Timestamp.
class ParquetInt96TimestampRange : public common::TimestampRange {
public:
Expand Down Expand Up @@ -54,7 +97,25 @@ class TimestampColumnReader : public IntegerColumnReader {
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {}
timestampPrecision_(params.timestampPrecision()),
isInt96_{
std::static_pointer_cast<const ParquetTypeWithId>(fileType_)
->parquetType_ == thrift::Type::INT96},
isInt64_{
std::static_pointer_cast<const ParquetTypeWithId>(fileType_)
->parquetType_ == thrift::Type::INT64} {
if (isInt64_) {
const auto logicalType =
std::static_pointer_cast<const ParquetTypeWithId>(fileType_)
->logicalType_;
VELOX_CHECK(logicalType);
VELOX_CHECK(logicalType->__isset.TIMESTAMP);
timestampUnit_ = logicalType->TIMESTAMP.unit;
VELOX_CHECK(
timestampUnit_.__isset.MILLIS || timestampUnit_.__isset.MICROS ||
timestampUnit_.__isset.NANOS);
}
}

bool hasBulkPath() const override {
return false;
Expand All @@ -70,29 +131,54 @@ class TimestampColumnReader : public IntegerColumnReader {
VectorPtr resultVector = *result;
auto rawValues =
resultVector->asUnchecked<FlatVector<Timestamp>>()->mutableRawValues();
for (auto i = 0; i < numValues_; ++i) {
if (resultVector->isNullAt(i)) {
continue;
if (isInt64_) {
for (auto i = 0; i < numValues_; ++i) {
if (resultVector->isNullAt(i)) {
continue;
}

const int128_t encoded = reinterpret_cast<int128_t&>(rawValues[i]);
const auto timestamp = toInt64Timestamp(encoded, timestampUnit_);
uint64_t nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
}

// Convert int128_t to Timestamp by extracting days and nanos.
const int128_t encoded = reinterpret_cast<int128_t&>(rawValues[i]);
const int32_t days = static_cast<int32_t>(encoded >> 64);
uint64_t nanos = encoded & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto timestamp = Timestamp::fromDaysAndNanos(days, nanos);

nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
} else if (isInt96_) {
for (auto i = 0; i < numValues_; ++i) {
if (resultVector->isNullAt(i)) {
continue;
}

// Convert int128_t to Timestamp by extracting days and nanos.
const int128_t encoded = reinterpret_cast<int128_t&>(rawValues[i]);
const int32_t days = static_cast<int32_t>(encoded >> 64);
uint64_t nanos = encoded & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto timestamp = Timestamp::fromDaysAndNanos(days, nanos);

nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
} else {
VELOX_UNREACHABLE();
}
}

Expand All @@ -106,34 +192,47 @@ class TimestampColumnReader : public IntegerColumnReader {
const RowSet& rows,
ExtractValues extractValues) {
if (auto* range = dynamic_cast<common::TimestampRange*>(filter)) {
// Convert TimestampRange to ParquetInt96TimestampRange.
ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange(
range->lower(), range->upper(), range->nullAllowed());
this->readWithVisitor(
rows,
dwio::common::ColumnVisitor<
int128_t,
ParquetInt96TimestampRange,
ExtractValues,
isDense>(newRange, this, rows, extractValues));
} else {
this->readWithVisitor(
rows,
dwio::common::
ColumnVisitor<int128_t, TFilter, ExtractValues, isDense>(
*reinterpret_cast<TFilter*>(filter),
this,
rows,
extractValues));
if (isInt64_) {
ParquetInt64TimestampRange newRange{
range->lower(),
range->upper(),
range->nullAllowed(),
timestampUnit_};
return this->readWithVisitor(
rows,
dwio::common::ColumnVisitor<
int128_t,
ParquetInt64TimestampRange,
ExtractValues,
isDense>(newRange, this, rows, extractValues));
}

if (isInt96_) {
// Convert TimestampRange to ParquetInt96TimestampRange.
ParquetInt96TimestampRange newRange(
range->lower(), range->upper(), range->nullAllowed());
return this->readWithVisitor(
rows,
dwio::common::ColumnVisitor<
int128_t,
ParquetInt96TimestampRange,
ExtractValues,
isDense>(newRange, this, rows, extractValues));
}

VELOX_UNREACHABLE();
}
return;

this->readWithVisitor(
rows,
dwio::common::ColumnVisitor<int128_t, TFilter, ExtractValues, isDense>(
*reinterpret_cast<TFilter*>(filter), this, rows, extractValues));
}

void read(
int64_t offset,
const RowSet& rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaround. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<TimestampColumnReader, true>(rows);
Expand All @@ -143,7 +242,10 @@ class TimestampColumnReader : public IntegerColumnReader {
private:
// The requested precision can be specified from HiveConfig to read timestamp
// from Parquet.
TimestampPrecision timestampPrecision_;
const TimestampPrecision timestampPrecision_;
const bool isInt96_;
const bool isInt64_;
thrift::TimeUnit timestampUnit_; // Only set in Int64 Timestamp.
};

} // namespace facebook::velox::parquet

0 comments on commit 43dc8a9

Please sign in to comment.