From d7546229b1e648681da77dcf303519db8b593cdb Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Wed, 10 Jan 2024 15:06:54 +0530 Subject: [PATCH 1/6] Support for INT64 timestamp in parquet files Resolve comments Fix build PR comments Remove reinterpret_cast Fix compile PR comments Update parquet files Refactor Fix formatting Fix compile PR comment Fix decimal tests Typo Remove timestamp decoder Remove white space Remove import --- velox/dwio/common/DirectDecoder.h | 27 ++++- velox/dwio/common/SelectiveColumnReader.cpp | 3 + velox/dwio/parquet/reader/PageReader.cpp | 58 ++++++++- velox/dwio/parquet/reader/ParquetReader.cpp | 36 +++++- .../examples/int64_micros_dictionary.parquet | Bin 0 -> 641 bytes .../tests/examples/int64_micros_plain.parquet | Bin 0 -> 608 bytes .../int64_millis_compatibility.parquet | Bin 0 -> 517 bytes .../examples/int64_millis_dictionary.parquet | Bin 0 -> 641 bytes .../tests/examples/int64_millis_plain.parquet | Bin 0 -> 608 bytes .../tests/reader/ParquetTableScanTest.cpp | 112 ++++++++++++++++++ 10 files changed, 228 insertions(+), 8 deletions(-) create mode 100644 velox/dwio/parquet/tests/examples/int64_micros_dictionary.parquet create mode 100644 velox/dwio/parquet/tests/examples/int64_micros_plain.parquet create mode 100644 velox/dwio/parquet/tests/examples/int64_millis_compatibility.parquet create mode 100644 velox/dwio/parquet/tests/examples/int64_millis_dictionary.parquet create mode 100644 velox/dwio/parquet/tests/examples/int64_millis_plain.parquet diff --git a/velox/dwio/common/DirectDecoder.h b/velox/dwio/common/DirectDecoder.h index 4cd9396d0936..d30697564b94 100644 --- a/velox/dwio/common/DirectDecoder.h +++ b/velox/dwio/common/DirectDecoder.h @@ -30,8 +30,10 @@ class DirectDecoder : public IntDecoder { std::unique_ptr input, bool useVInts, uint32_t numBytes, - bool bigEndian = false) - : IntDecoder{std::move(input), useVInts, numBytes, bigEndian} {} + bool bigEndian = false, + std::optional precision = std::nullopt) + : IntDecoder{std::move(input), useVInts, numBytes, bigEndian}, + precision_(precision) {} void seekToRowGroup(dwio::common::PositionProvider&) override; @@ -92,7 +94,24 @@ class DirectDecoder : public IntDecoder { } else if constexpr (std::is_same_v< typename Visitor::DataType, int128_t>) { - toSkip = visitor.process(super::template readInt(), atEnd); + if (precision_.has_value()) { + auto units = super::template readInt(); + Timestamp timestamp; + if (precision_.value() == TimestampPrecision::kMilliseconds) { + timestamp = Timestamp::fromMillis(units); + } else if (precision_.value() == TimestampPrecision::kMicroseconds) { + timestamp = Timestamp::fromMicros(units); + } else { + VELOX_NYI( + "Unsupported timestamp unit. Only kMillis and kMicros supported."); + } + + int128_t value; + memcpy(&value, ×tamp, sizeof(int128_t)); + toSkip = visitor.process(value, atEnd); + } else { + toSkip = visitor.process(super::template readInt(), atEnd); + } } else { toSkip = visitor.process(super::template readInt(), atEnd); } @@ -111,6 +130,8 @@ class DirectDecoder : public IntDecoder { private: using super = IntDecoder; + const std::optional precision_; + float readFloat() { float temp; auto buffer = readFixed(sizeof(float), &temp); diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index 278f09291355..fa2ba003fc70 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -241,6 +241,9 @@ void SelectiveColumnReader::getIntValues( VELOX_FAIL("Unsupported value size: {}", valueSize_); } break; + case TypeKind::TIMESTAMP: + getFlatValues(rows, result, requestedType); + break; default: VELOX_FAIL( "Not a valid type for integer reader: {}", requestedType->toString()); diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index cf46fdb58184..0fef2732c977 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -345,8 +345,10 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { ? sizeof(float) : sizeof(double); auto numBytes = dictionary_.numValues * typeSize; - if (type_->type()->isShortDecimal() && - parquetType == thrift::Type::INT32) { + if ((type_->type()->isShortDecimal() && + parquetType == thrift::Type::INT32) || + (type_->type()->isTimestamp() && + parquetType == thrift::Type::INT64)) { auto veloxTypeLength = type_->type()->cppSizeInBytes(); auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; dictionary_.values = @@ -373,6 +375,32 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { // We start from the end to allow in-place expansion. values[i] = parquetValues[i]; } + } else if ( + type_->type()->isTimestamp() && parquetType == thrift::Type::INT64) { + VELOX_CHECK(type_->logicalType_.has_value()); + auto logicalType = type_->logicalType_.value(); + if (logicalType.__isset.TIMESTAMP) { + if (!logicalType.TIMESTAMP.isAdjustedToUTC) { + VELOX_NYI("Only UTC adjusted Timestamp is supported."); + } + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + int64_t units; + + if (logicalType.TIMESTAMP.unit.__isset.MICROS) { + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + memcpy(&units, parquetValues + i * typeSize, typeSize); + values[i] = Timestamp::fromMicros(units); + } + } else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) { + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + memcpy(&units, parquetValues + i * typeSize, typeSize); + values[i] = Timestamp::fromMillis(units); + } + } else { + VELOX_NYI("Nano Timestamp unit is not supported."); + } + } } break; } @@ -679,6 +707,32 @@ void PageReader::makeDecoder() { true); } break; + case thrift::Type::INT64: { + std::optional precisionUnit; + if (type_->logicalType_.has_value() && + type_->logicalType_.value().__isset.TIMESTAMP) { + auto logicalType = type_->logicalType_.value(); + + VELOX_CHECK( + logicalType.TIMESTAMP.isAdjustedToUTC, + "Only UTC adjusted Timestamp is supported."); + VELOX_CHECK( + !logicalType.TIMESTAMP.unit.__isset.NANOS, + "Nano Timestamp unit not supported."); + + precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS + ? TimestampPrecision::kMicroseconds + : TimestampPrecision::kMilliseconds; + } + + directDecoder_ = std::make_unique>( + std::make_unique( + pageData_, encodedDataSize_), + false, + parquetTypeBytes(type_->parquetType_.value()), + false, + precisionUnit); + } break; default: { directDecoder_ = std::make_unique>( std::make_unique( diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index e36e305fe943..98cc38650e58 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -551,13 +551,43 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( int32_t precision = schemaElement.__isset.precision ? schemaElement.precision : 0; int32_t scale = schemaElement.__isset.scale ? schemaElement.scale : 0; - int32_t type_length = + int32_t typeLength = schemaElement.__isset.type_length ? schemaElement.type_length : 0; std::vector> children; - const std::optional logicalType_ = + std::optional logicalType_ = schemaElement.__isset.logicalType ? std::optional(schemaElement.logicalType) : std::nullopt; + + if (veloxType->kind() == TypeKind::TIMESTAMP && + schemaElement.type == thrift::Type::INT64 && + !logicalType_.has_value()) { + // Construct logical type from deprecated converted type of Parquet. + thrift::TimestampType timestamp; + timestamp.__set_isAdjustedToUTC(true); + thrift::TimeUnit unit; + + if (schemaElement.converted_type == + thrift::ConvertedType::TIMESTAMP_MICROS) { + thrift::MicroSeconds micros; + unit.__set_MICROS(micros); + } else if ( + schemaElement.converted_type == + thrift::ConvertedType::TIMESTAMP_MILLIS) { + thrift::MilliSeconds millis; + unit.__set_MILLIS(millis); + } else { + VELOX_NYI( + "{} Timestamp unit not supported.", schemaElement.converted_type); + } + + timestamp.__set_unit(unit); + thrift::LogicalType newLogicalType; + newLogicalType.__set_TIMESTAMP(timestamp); + + logicalType_ = std::optional(newLogicalType); + } + auto leafTypePtr = std::make_unique( veloxType, std::move(children), @@ -573,7 +603,7 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( isRepeated, precision, scale, - type_length); + typeLength); if (schemaElement.repetition_type == thrift::FieldRepetitionType::REPEATED) { diff --git a/velox/dwio/parquet/tests/examples/int64_micros_dictionary.parquet b/velox/dwio/parquet/tests/examples/int64_micros_dictionary.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5a8c72053979b8b4983bed7f039c4bfeea2e7a71 GIT binary patch literal 641 zcmZWn!E4h{82^%{BLm|a*oz4SDS?#^Vla!Xr3|rS2N4fL+)0MKB(GXb+Ac|LM(M@F z7~;{x27-8#oxG1Z51vL(-gkC-82tmp`K4BS@Po&D@B4n=@ArF;Z=>{Cqsl+t9$= ziBwp9x645mbm!M)76ix!c|>-9oPW*hBt^YBA_A1(@(4;fK?aaz!*HDS_07#@v)|v_ zJ2*HzJUSYU&dx3_05qA`RPp3^JfXv5V{KugKzL2TvaXzRInCO5CN<^Cz+Q zjD?i&K!hb;6_QrSmKGnpmUVN|tfZQ*>7`RnCxTX)69w%LhVV+N?B|p(Jl0C0K9iey zP@w$J#4HP%iH{R9MbyL%lvg>TDkGlUfs8UOiAcMMmQ7XdL{BZ+rD)Qc__bcJC~kGy zYTmIN%LaA%KSaD8SopN<9!spK)3-vFP@g7L-5MClPL~;VBTk~8ml&qeY_Y(Pjr!w( z(WYTm%02)KQ($(B6B>3g)$RoWbpr%pH#3dDyheR69BvIky+2jG5k|$S&7y7DE0$9% zyS-N67n?QTW4>Km@osr`)AgBG^*qHmfqf>Mm=#j^FE(~ Bv|9iG literal 0 HcmV?d00001 diff --git a/velox/dwio/parquet/tests/examples/int64_micros_plain.parquet b/velox/dwio/parquet/tests/examples/int64_micros_plain.parquet new file mode 100644 index 0000000000000000000000000000000000000000..1cfa7ae4359555e4b22222da097f05575c834c77 GIT binary patch literal 608 zcmZWnL2DC16n>d*+ASz0&>3f74{@QPfjGEb6GISbdJ^$ayh@eX-H8TwlXQ2}ZW{ug zJoyU>BKSW_^{fYff+wv9p=XiSx0`4V?&W*)eQ)0Pyv&@vb(c}82~5|3_w`Sq$dQcoM=3_ zmW)*V@>$})(2=49sIft~k@h3KvHs{y#cC9HStcxD)o?dMG)-6*vldEN#x7Ub38|U) z7ETL$2Alu<7c7Rr7eapk_LWe^-V=Hs@! z2h)o=Bros#KJR@4nT)0m0_Y4~*hrOuhBQ;xPQZ2@B3vajb1xuRAvY3%f6_n-nk_eZ z{MSi^;0URPJw7cmmcKn0{wq%yQYBvlIuudDYv%wT8@6HAH4{Oj1~g+UAQh|F!(m;! zKKMIC8>cwLDv)TpLE$eH;x7fSm3sOQyjC!jwBDHKFO+3Wnxh+^v{=Mc8eWuK(0u+u z6E0Z51k<0EM0{qP3`rsK(ig-gVZ`I0Aj5|xNm)`!)w86qE39sXU`ZxZX&J}Ni)B&B z;)2^`-{FdO^DrxK6L_yM4}mci^_Jf literal 0 HcmV?d00001 diff --git a/velox/dwio/parquet/tests/examples/int64_millis_dictionary.parquet b/velox/dwio/parquet/tests/examples/int64_millis_dictionary.parquet new file mode 100644 index 0000000000000000000000000000000000000000..bd2b55553ca8041c17ff79b5dee81f96aa3c76d4 GIT binary patch literal 641 zcmZWn!E4h{82^%{BLm|a*^3DTDS?#^YFHLqOBrIv4k8|kxRVTdNnW*>v|WkC;+{CqslT2RN_ ziIiJ=x645qbm!M)8U#p#JR-Y4&c9}LlAzuk5dn&Cc?2b#AOpyO~AiSnvSyxUu@{+?NV5W5MO4Oy{^Cyw} zj0KeNK!hb;72;OFmaaW`E$jNESxGcq(+j7ZP6Vwi3Uk^Y4B?eT+0Q6nc&rsfeI_^a zpg{4TiCGr35ej%Bce>1|8BrYe+}J1?%@*^$$f!LY z7;PG)rSt=^Fa@T!D5gOdQ|+GbQ^!XTcC(W4m)EEbhQqBPsQ0I;H-j)gwVAg}Yt^*# zMW@&Dy?nFkxy-YwtL`n=YC0ZsE3WId*+ASz0;!I~?4{@QPfjGEb6GISbdJ^$ayh@eX-H8TwlXQ1uw+#VL zp8N#`5&R#ede(zK!IRd5(6dPE+f6hF_wv2@zBljt-mtsfM-B!0d4-;p=$Bugewr4| zkXWWBz_%q%03a*DQ3)zHo*;FJm2tMTs_co6{BdF$e?-;% zaB^N0(4Fb01sLyM*N<{p9bS}QVIe9+aP#!!{4)Qr?Cz{#1^`L~0}vK7=`q17CmQ#! zBqJ5Sc$)anb)+Z(YHSd0r2R;5TzmMYVp+vqmI+H(HQdb*O%s;Itc4Pmv5OUUOls!c zh10^G!R9~z1&bl@h0q^>eI=B!_k`Yqd{9<1aoBZ?+2DK_Ka*;tFqteu%9lD$`ojS) zyV8{o@cMrp$$lttpZ&2;Wjs8Pkxo^hQpNWsc6vC{cE?WAcS31Aohtv}$Lr8)or literal 0 HcmV?d00001 diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 4261ee702249..aa4da273a580 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -834,6 +834,118 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { kSize, [](auto i) { return Timestamp(i, i * 1'001'000); }), }); assertEqualResults({expected}, result.second); + +TEST_F(ParquetTableScanTest, timestampINT64millis) { + std::vector rawData = { + Timestamp(0, 1000000), + Timestamp(-1, 999000000), + Timestamp(1, 0), + Timestamp(-1, 0), + Timestamp(1, 1000000), + Timestamp(-2, 999000000), + Timestamp(0, 999000000), + Timestamp(-1, 1000000), + Timestamp(1000, 0), + Timestamp(-1000, 0), + Timestamp(1000, 1000000), + Timestamp(-1001, 999000000), + Timestamp(99, 999000000), + Timestamp(-100, 1000000)}; + + auto a = + makeFlatVector(60, [&](auto row) { return rawData[row / 4]; }); + + auto expected = makeRowVector({"time"}, {a}); + createDuckDbTable("expected", {expected}); + + auto vector = makeArrayVector({{}}); + + loadData( + getExampleFilePath("int64_millis_dictionary.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + assertSelect({"time"}, "SELECT time from expected"); + + loadData( + getExampleFilePath("int64_millis_plain.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + assertSelect({"time"}, "SELECT time from expected"); +} + +TEST_F(ParquetTableScanTest, timestampINT64micros) { + std::vector rawData = { + Timestamp(0, 0), + Timestamp(0, 1000), + Timestamp(-1, 999999000), + Timestamp(0, 1000000), + Timestamp(-1, 999000000), + Timestamp(0, 1001000), + Timestamp(-1, 998999000), + Timestamp(0, 999000), + Timestamp(-1, 999001000), + Timestamp(1, 0), + Timestamp(-1, 0), + Timestamp(1, 1000), + Timestamp(-2, 999999000), + Timestamp(0, 99999000), + Timestamp(-1, 900001000)}; + + auto a = + makeFlatVector(60, [&](auto row) { return rawData[row / 4]; }); + + auto expected = makeRowVector({"time"}, {a}); + createDuckDbTable("expected", {expected}); + + auto vector = makeArrayVector({{}}); + + loadData( + getExampleFilePath("int64_micros_dictionary.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + assertSelect({"time"}, "SELECT time from expected"); + + loadData( + getExampleFilePath("int64_micros_plain.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + assertSelect({"time"}, "SELECT time from expected"); +} + +TEST_F(ParquetTableScanTest, timestampINT64BackwardCompatible) { + auto a = makeFlatVector( + 3, [](auto row) { return Timestamp(0, 10 * 1000000L); }); + + auto expected = makeRowVector({"time"}, {a}); + createDuckDbTable("expected", {expected}); + + auto vector = makeArrayVector({{}}); + + loadData( + getExampleFilePath("int64_millis_compatibility.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + assertSelect({"time"}, "SELECT time from expected"); } int main(int argc, char** argv) { From 74fce2d76068b8b113b20fd7c679741cb9a1d961 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Mon, 22 Jul 2024 16:38:14 +0530 Subject: [PATCH 2/6] Fix Tmp --- velox/dwio/common/DirectDecoder.h | 27 +-- velox/dwio/common/SelectiveColumnReader.cpp | 3 - velox/dwio/parquet/reader/PageReader.cpp | 58 +----- .../parquet/reader/ParquetColumnReader.cpp | 10 +- .../parquet/reader/TimestampColumnReader.h | 83 ++++++++ .../tests/reader/ParquetTableScanTest.cpp | 180 +++++++++--------- 6 files changed, 188 insertions(+), 173 deletions(-) diff --git a/velox/dwio/common/DirectDecoder.h b/velox/dwio/common/DirectDecoder.h index d30697564b94..4cd9396d0936 100644 --- a/velox/dwio/common/DirectDecoder.h +++ b/velox/dwio/common/DirectDecoder.h @@ -30,10 +30,8 @@ class DirectDecoder : public IntDecoder { std::unique_ptr input, bool useVInts, uint32_t numBytes, - bool bigEndian = false, - std::optional precision = std::nullopt) - : IntDecoder{std::move(input), useVInts, numBytes, bigEndian}, - precision_(precision) {} + bool bigEndian = false) + : IntDecoder{std::move(input), useVInts, numBytes, bigEndian} {} void seekToRowGroup(dwio::common::PositionProvider&) override; @@ -94,24 +92,7 @@ class DirectDecoder : public IntDecoder { } else if constexpr (std::is_same_v< typename Visitor::DataType, int128_t>) { - if (precision_.has_value()) { - auto units = super::template readInt(); - Timestamp timestamp; - if (precision_.value() == TimestampPrecision::kMilliseconds) { - timestamp = Timestamp::fromMillis(units); - } else if (precision_.value() == TimestampPrecision::kMicroseconds) { - timestamp = Timestamp::fromMicros(units); - } else { - VELOX_NYI( - "Unsupported timestamp unit. Only kMillis and kMicros supported."); - } - - int128_t value; - memcpy(&value, ×tamp, sizeof(int128_t)); - toSkip = visitor.process(value, atEnd); - } else { - toSkip = visitor.process(super::template readInt(), atEnd); - } + toSkip = visitor.process(super::template readInt(), atEnd); } else { toSkip = visitor.process(super::template readInt(), atEnd); } @@ -130,8 +111,6 @@ class DirectDecoder : public IntDecoder { private: using super = IntDecoder; - const std::optional precision_; - float readFloat() { float temp; auto buffer = readFixed(sizeof(float), &temp); diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index fa2ba003fc70..278f09291355 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -241,9 +241,6 @@ void SelectiveColumnReader::getIntValues( VELOX_FAIL("Unsupported value size: {}", valueSize_); } break; - case TypeKind::TIMESTAMP: - getFlatValues(rows, result, requestedType); - break; default: VELOX_FAIL( "Not a valid type for integer reader: {}", requestedType->toString()); diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 0fef2732c977..8c9cba7a0422 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -345,10 +345,8 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { ? sizeof(float) : sizeof(double); auto numBytes = dictionary_.numValues * typeSize; - if ((type_->type()->isShortDecimal() && - parquetType == thrift::Type::INT32) || - (type_->type()->isTimestamp() && - parquetType == thrift::Type::INT64)) { + if (type_->type()->isShortDecimal() && + parquetType == thrift::Type::INT32) { auto veloxTypeLength = type_->type()->cppSizeInBytes(); auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; dictionary_.values = @@ -375,32 +373,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { // We start from the end to allow in-place expansion. values[i] = parquetValues[i]; } - } else if ( - type_->type()->isTimestamp() && parquetType == thrift::Type::INT64) { - VELOX_CHECK(type_->logicalType_.has_value()); - auto logicalType = type_->logicalType_.value(); - if (logicalType.__isset.TIMESTAMP) { - if (!logicalType.TIMESTAMP.isAdjustedToUTC) { - VELOX_NYI("Only UTC adjusted Timestamp is supported."); - } - auto values = dictionary_.values->asMutable(); - auto parquetValues = dictionary_.values->asMutable(); - int64_t units; - - if (logicalType.TIMESTAMP.unit.__isset.MICROS) { - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - memcpy(&units, parquetValues + i * typeSize, typeSize); - values[i] = Timestamp::fromMicros(units); - } - } else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) { - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - memcpy(&units, parquetValues + i * typeSize, typeSize); - values[i] = Timestamp::fromMillis(units); - } - } else { - VELOX_NYI("Nano Timestamp unit is not supported."); - } - } } break; } @@ -707,32 +679,6 @@ void PageReader::makeDecoder() { true); } break; - case thrift::Type::INT64: { - std::optional precisionUnit; - if (type_->logicalType_.has_value() && - type_->logicalType_.value().__isset.TIMESTAMP) { - auto logicalType = type_->logicalType_.value(); - - VELOX_CHECK( - logicalType.TIMESTAMP.isAdjustedToUTC, - "Only UTC adjusted Timestamp is supported."); - VELOX_CHECK( - !logicalType.TIMESTAMP.unit.__isset.NANOS, - "Nano Timestamp unit not supported."); - - precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS - ? TimestampPrecision::kMicroseconds - : TimestampPrecision::kMilliseconds; - } - - directDecoder_ = std::make_unique>( - std::make_unique( - pageData_, encodedDataSize_), - false, - parquetTypeBytes(type_->parquetType_.value()), - false, - precisionUnit); - } break; default: { directDecoder_ = std::make_unique>( std::make_unique( diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index a87a295a787c..d04fdbbccc2f 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -75,10 +75,16 @@ std::unique_ptr ParquetColumnReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); - case TypeKind::TIMESTAMP: + case TypeKind::TIMESTAMP: { + auto& parquetFileType = static_cast(*fileType); + auto logicalTypeOpt = parquetFileType.logicalType_; + if (logicalTypeOpt.has_value() && logicalTypeOpt.value().__isset.TIMESTAMP) {return std::make_unique( + requestedType, fileType, params, scanSpec);} + else { return std::make_unique( requestedType, fileType, params, scanSpec); - + } + } default: VELOX_FAIL( "buildReader unhandled type: " + diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 11eb00e24286..0e7fa8162de7 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -82,4 +82,87 @@ class TimestampColumnReader : public IntegerColumnReader { TimestampPrecision timestampPrecision_; }; +class TimestampINT64ColumnReader : public IntegerColumnReader { + public: + TimestampINT64ColumnReader( + const TypePtr& requestedType, + std::shared_ptr fileType, + ParquetParams& params, + common::ScanSpec& scanSpec) + : IntegerColumnReader(BIGINT(), fileType, params, scanSpec) { + auto& parquetFileType = static_cast(*fileType_); + auto logicalTypeOpt = parquetFileType.logicalType_; + VELOX_CHECK(logicalTypeOpt.has_value()); + + auto logicalType = logicalTypeOpt.value(); + VELOX_CHECK(logicalType.__isset.TIMESTAMP); + + if (!logicalType.TIMESTAMP.isAdjustedToUTC) { + VELOX_NYI("Only UTC adjusted Timestamp is supported."); + } + + if (logicalType.TIMESTAMP.unit.__isset.MICROS) { + sourcePrecision_ = TimestampPrecision::kMicroseconds; + } else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) { + sourcePrecision_ = TimestampPrecision::kMilliseconds; + } else { + VELOX_NYI("Nano Timestamp unit is not supported."); + } + } + + bool hasBulkPath() const override { + return false; + } + + void getValues(RowSet rows, VectorPtr* result) override { + // Upcast to int128_t here so we have enough memory already in vector to + // hold Timestamp (16bit) vs int64_t (8bit) + getFlatValues(rows, result, requestedType_); + + VectorPtr resultVector = *result; + auto intValues = resultVector->asUnchecked>(); + + auto rawValues = + resultVector->asUnchecked>()->mutableRawValues(); + + Timestamp timestamp; + for (vector_size_t i = 0; i < numValues_; ++i) { + if (intValues->isNullAt(i)) + continue; + + const auto timestampInt = intValues->valueAt(i); + std::cout << static_cast(timestampInt) << std::endl; + Timestamp timestamp; + if (sourcePrecision_ == TimestampPrecision::kMicroseconds) { + timestamp = Timestamp::fromMicros(timestampInt); + } else { + timestamp = Timestamp::fromMillis(timestampInt); + } + + memcpy(&rawValues[i], ×tamp, sizeof(int128_t)); + } + + *result = std::make_shared>( + &memoryPool_, + TIMESTAMP(), + resultNulls(), + numValues_, + intValues->values(), + std::move(stringBuffers_)); + } + + void read( + vector_size_t offset, + RowSet rows, + const uint64_t* /*incomingNulls*/) override { + auto& data = formatData_->as(); + // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + prepareRead(offset, rows, nullptr); + readCommon(rows); + readOffset_ += rows.back() + 1; + } + + private: + TimestampPrecision sourcePrecision_; +}; } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index aa4da273a580..20c4cb7bd1d7 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -221,6 +221,88 @@ class ParquetTableScanTest : public HiveConnectorTestBase { writer->close(); } + void testTimestampINT64( + const std::string& fileName, + TimestampPrecision precision) { + std::vector rawDataMicros = { + Timestamp(0, 0), + Timestamp(0, 1000), + Timestamp(-1, 999999000), + Timestamp(0, 1000000), + Timestamp(-1, 999000000), + Timestamp(0, 1001000), + Timestamp(-1, 998999000), + Timestamp(0, 999000), + Timestamp(-1, 999001000), + Timestamp(1, 0), + Timestamp(-1, 0), + Timestamp(1, 1000), + Timestamp(-2, 999999000), + Timestamp(0, 99999000), + Timestamp(-1, 900001000)}; + + std::vector rawDataMillis = { + Timestamp(0, 0), + Timestamp(0, 1000000), + Timestamp(-1, 999000000), + Timestamp(1, 0), + Timestamp(-1, 0), + Timestamp(1, 1000000), + Timestamp(-2, 999000000), + Timestamp(0, 999000000), + Timestamp(-1, 1000000), + Timestamp(1000, 0), + Timestamp(-1000, 0), + Timestamp(1000, 1000000), + Timestamp(-1001, 999000000), + Timestamp(99, 999000000), + Timestamp(-100, 1000000)}; + + auto a = makeFlatVector(60, [&](auto row) { + return (precision == TimestampPrecision::kMilliseconds) + ? rawDataMillis[row / 4] + : rawDataMicros[row / 4]; + }); + + auto expected = makeRowVector({"time"}, {a}); + auto schema = asRowType(expected->type()); + auto plan = PlanBuilder().tableScan(schema).planNode(); + + CursorParameters params; + std::shared_ptr executor = + std::make_shared( + std::thread::hardware_concurrency()); + std::shared_ptr queryCtx = + core::QueryCtx::create(executor.get()); + std::unordered_map session = { + {std::string(connector::hive::HiveConfig::kReadTimestampUnitSession), + (precision == TimestampPrecision::kMilliseconds) ? "3" : "6"}}; + queryCtx->setConnectorSessionOverridesUnsafe( + kHiveConnectorId, std::move(session)); + params.queryCtx = queryCtx; + params.planNode = plan; + const int numSplitsPerFile = 1; + + bool noMoreSplits = false; + auto addSplits = [&](exec::Task* task) { + if (!noMoreSplits) { + auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits( + {getExampleFilePath(fileName)}, + numSplitsPerFile, + dwio::common::FileFormat::PARQUET); + for (const auto& split : splits) { + task->addSplit("0", exec::Split(split)); + } + task->noMoreSplits("0"); + } + noMoreSplits = true; + }; + + auto result = readCursor(params, addSplits); + ASSERT_TRUE(waitForTaskCompletion(result.first->task().get())); + assertEqualResults({expected}, result.second); + } + private: RowTypePtr getRowType(std::vector&& outputColumnNames) const { std::vector types; @@ -834,98 +916,20 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { kSize, [](auto i) { return Timestamp(i, i * 1'001'000); }), }); assertEqualResults({expected}, result.second); - -TEST_F(ParquetTableScanTest, timestampINT64millis) { - std::vector rawData = { - Timestamp(0, 1000000), - Timestamp(-1, 999000000), - Timestamp(1, 0), - Timestamp(-1, 0), - Timestamp(1, 1000000), - Timestamp(-2, 999000000), - Timestamp(0, 999000000), - Timestamp(-1, 1000000), - Timestamp(1000, 0), - Timestamp(-1000, 0), - Timestamp(1000, 1000000), - Timestamp(-1001, 999000000), - Timestamp(99, 999000000), - Timestamp(-100, 1000000)}; - - auto a = - makeFlatVector(60, [&](auto row) { return rawData[row / 4]; }); - - auto expected = makeRowVector({"time"}, {a}); - createDuckDbTable("expected", {expected}); - - auto vector = makeArrayVector({{}}); - - loadData( - getExampleFilePath("int64_millis_dictionary.parquet"), - ROW({"time"}, {TIMESTAMP()}), - makeRowVector( - {"time"}, - { - vector, - })); - assertSelect({"time"}, "SELECT time from expected"); +} - loadData( - getExampleFilePath("int64_millis_plain.parquet"), - ROW({"time"}, {TIMESTAMP()}), - makeRowVector( - {"time"}, - { - vector, - })); - assertSelect({"time"}, "SELECT time from expected"); +TEST_F(ParquetTableScanTest, timestampINT64millis) { + testTimestampINT64( + "int64_millis_dictionary.parquet", TimestampPrecision::kMilliseconds); + //testTimestampINT64( + // "int64_millis_plain.parquet", TimestampPrecision::kMilliseconds); } TEST_F(ParquetTableScanTest, timestampINT64micros) { - std::vector rawData = { - Timestamp(0, 0), - Timestamp(0, 1000), - Timestamp(-1, 999999000), - Timestamp(0, 1000000), - Timestamp(-1, 999000000), - Timestamp(0, 1001000), - Timestamp(-1, 998999000), - Timestamp(0, 999000), - Timestamp(-1, 999001000), - Timestamp(1, 0), - Timestamp(-1, 0), - Timestamp(1, 1000), - Timestamp(-2, 999999000), - Timestamp(0, 99999000), - Timestamp(-1, 900001000)}; - - auto a = - makeFlatVector(60, [&](auto row) { return rawData[row / 4]; }); - - auto expected = makeRowVector({"time"}, {a}); - createDuckDbTable("expected", {expected}); - - auto vector = makeArrayVector({{}}); - - loadData( - getExampleFilePath("int64_micros_dictionary.parquet"), - ROW({"time"}, {TIMESTAMP()}), - makeRowVector( - {"time"}, - { - vector, - })); - assertSelect({"time"}, "SELECT time from expected"); - - loadData( - getExampleFilePath("int64_micros_plain.parquet"), - ROW({"time"}, {TIMESTAMP()}), - makeRowVector( - {"time"}, - { - vector, - })); - assertSelect({"time"}, "SELECT time from expected"); + testTimestampINT64( + "int64_micros_dictionary.parquet", TimestampPrecision::kMicroseconds); + testTimestampINT64( + "int64_micros_plain.parquet", TimestampPrecision::kMicroseconds); } TEST_F(ParquetTableScanTest, timestampINT64BackwardCompatible) { From f4764d399df0f365d1cb204badd3590345e77939 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Wed, 7 Aug 2024 17:08:59 +0530 Subject: [PATCH 3/6] Refactor --- velox/dwio/parquet/reader/PageReader.cpp | 2 +- .../parquet/reader/ParquetColumnReader.cpp | 15 +- .../parquet/reader/TimestampColumnReader.h | 169 ++++++++++++++---- .../tests/reader/ParquetTableScanTest.cpp | 76 +++++++- 4 files changed, 219 insertions(+), 43 deletions(-) diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 8c9cba7a0422..cf46fdb58184 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -346,7 +346,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { : sizeof(double); auto numBytes = dictionary_.numValues * typeSize; if (type_->type()->isShortDecimal() && - parquetType == thrift::Type::INT32) { + parquetType == thrift::Type::INT32) { auto veloxTypeLength = type_->type()->cppSizeInBytes(); auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; dictionary_.values = diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index d04fdbbccc2f..0266ac568a5a 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -77,12 +77,15 @@ std::unique_ptr ParquetColumnReader::build( case TypeKind::TIMESTAMP: { auto& parquetFileType = static_cast(*fileType); - auto logicalTypeOpt = parquetFileType.logicalType_; - if (logicalTypeOpt.has_value() && logicalTypeOpt.value().__isset.TIMESTAMP) {return std::make_unique( - requestedType, fileType, params, scanSpec);} - else { - return std::make_unique( - requestedType, fileType, params, scanSpec); + auto logicalTypeOpt = parquetFileType.logicalType_; + if (logicalTypeOpt.has_value() && + logicalTypeOpt.value().__isset.TIMESTAMP && + parquetFileType.parquetType_ == thrift::Type::INT64) { + return std::make_unique( + requestedType, fileType, params, scanSpec); + } else { + return std::make_unique( + requestedType, fileType, params, scanSpec); } } default: diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 0e7fa8162de7..58f36ea73d35 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -21,9 +21,9 @@ namespace facebook::velox::parquet { -class TimestampColumnReader : public IntegerColumnReader { +class TimestampINT96ColumnReader : public IntegerColumnReader { public: - TimestampColumnReader( + TimestampINT96ColumnReader( const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, @@ -89,7 +89,8 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { std::shared_ptr fileType, ParquetParams& params, common::ScanSpec& scanSpec) - : IntegerColumnReader(BIGINT(), fileType, params, scanSpec) { + : IntegerColumnReader(requestedType, fileType, params, scanSpec), + timestampPrecision_(params.timestampPrecision()) { auto& parquetFileType = static_cast(*fileType_); auto logicalTypeOpt = parquetFileType.logicalType_; VELOX_CHECK(logicalTypeOpt.has_value()); @@ -102,9 +103,9 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { } if (logicalType.TIMESTAMP.unit.__isset.MICROS) { - sourcePrecision_ = TimestampPrecision::kMicroseconds; + parquetTimestampPrecision_ = TimestampPrecision::kMicroseconds; } else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) { - sourcePrecision_ = TimestampPrecision::kMilliseconds; + parquetTimestampPrecision_ = TimestampPrecision::kMilliseconds; } else { VELOX_NYI("Nano Timestamp unit is not supported."); } @@ -114,41 +115,87 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { return false; } - void getValues(RowSet rows, VectorPtr* result) override { - // Upcast to int128_t here so we have enough memory already in vector to - // hold Timestamp (16bit) vs int64_t (8bit) - getFlatValues(rows, result, requestedType_); - - VectorPtr resultVector = *result; - auto intValues = resultVector->asUnchecked>(); + void + processNulls(const bool isNull, const RowSet rows, const uint64_t* rawNulls) { + if (!rawNulls) { + return; + } + auto rawTs = values_->asMutable(); - auto rawValues = - resultVector->asUnchecked>()->mutableRawValues(); + returnReaderNulls_ = false; + anyNulls_ = !isNull; + allNull_ = isNull; + vector_size_t idx = 0; + for (vector_size_t i = 0; i < numValues_; i++) { + if (isNull) { + if (bits::isBitNull(rawNulls, i)) { + bits::setNull(rawResultNulls_, idx); + addOutputRow(rows[i]); + idx++; + } + } else { + if (!bits::isBitNull(rawNulls, i)) { + bits::setNull(rawResultNulls_, idx, false); + rawTs[idx] = rawTs[i]; + addOutputRow(rows[i]); + idx++; + } + } + } + } - Timestamp timestamp; - for (vector_size_t i = 0; i < numValues_; ++i) { - if (intValues->isNullAt(i)) - continue; + void processFilter( + const common::Filter* filter, + const RowSet rows, + const uint64_t* rawNulls) { + auto rawTs = values_->asMutable(); - const auto timestampInt = intValues->valueAt(i); - std::cout << static_cast(timestampInt) << std::endl; - Timestamp timestamp; - if (sourcePrecision_ == TimestampPrecision::kMicroseconds) { - timestamp = Timestamp::fromMicros(timestampInt); + returnReaderNulls_ = false; + anyNulls_ = false; + allNull_ = true; + vector_size_t idx = 0; + for (vector_size_t i = 0; i < numValues_; i++) { + if (rawNulls && bits::isBitNull(rawNulls, i)) { + if (filter->testNull()) { + bits::setNull(rawResultNulls_, idx); + addOutputRow(rows[i]); + anyNulls_ = true; + idx++; + } } else { - timestamp = Timestamp::fromMillis(timestampInt); + if (filter->testTimestamp(rawTs[i])) { + if (rawNulls) { + bits::setNull(rawResultNulls_, idx, false); + } + rawTs[idx] = rawTs[i]; + addOutputRow(rows[i]); + allNull_ = false; + idx++; + } } + } + } + + void getValues(RowSet rows, VectorPtr* result) override { + getFlatValues(rows, result, requestedType_); + } - memcpy(&rawValues[i], ×tamp, sizeof(int128_t)); + Timestamp adjustToPrecision( + const Timestamp& timestamp, + TimestampPrecision precision) { + auto nano = timestamp.getNanos(); + switch (precision) { + case TimestampPrecision::kMilliseconds: + nano = nano / 1'000'000 * 1'000'000; + break; + case TimestampPrecision::kMicroseconds: + nano = nano / 1'000 * 1'000; + break; + case TimestampPrecision::kNanoseconds: + break; } - *result = std::make_shared>( - &memoryPool_, - TIMESTAMP(), - resultNulls(), - numValues_, - intValues->values(), - std::move(stringBuffers_)); + return Timestamp(timestamp.getSeconds(), nano); } void read( @@ -156,13 +203,67 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { RowSet rows, const uint64_t* /*incomingNulls*/) override { auto& data = formatData_->as(); - // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. prepareRead(offset, rows, nullptr); + + // Remove filter so that we can do filtering here once data is represented + // as timestamp type + const auto filter = scanSpec_->filter()->clone(); + scanSpec_->setFilter(nullptr); + readCommon(rows); + + auto tsValues = + AlignedBuffer::allocate(numValues_, &memoryPool_); + auto rawTs = tsValues->asMutable(); + auto rawTsInt64 = values_->asMutable(); + const auto rawNulls = + resultNulls() ? resultNulls()->as() : nullptr; + + Timestamp timestamp; + for (vector_size_t i = 0; i < numValues_; i++) { + if (!rawNulls || !bits::isBitNull(rawNulls, i)) { + if (parquetTimestampPrecision_ == TimestampPrecision::kMicroseconds) { + timestamp = Timestamp::fromMicros(rawTsInt64[i]); + } else { + timestamp = Timestamp::fromMillis(rawTsInt64[i]); + } + + rawTs[i] = adjustToPrecision(timestamp, timestampPrecision_); + } + } + values_ = tsValues; + rawValues_ = values_->asMutable(); + + switch ( + !filter || + (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls) + ? common::FilterKind::kAlwaysTrue + : filter->kind()) { + case common::FilterKind::kAlwaysTrue: + // Simply add all rows to output. + for (vector_size_t i = 0; i < numValues_; i++) { + addOutputRow(rows[i]); + } + break; + case common::FilterKind::kIsNull: + processNulls(true, rows, rawNulls); + break; + case common::FilterKind::kIsNotNull: + processNulls(false, rows, rawNulls); + break; + case common::FilterKind::kTimestampRange: + case common::FilterKind::kMultiRange: + processFilter(filter.get(), rows, rawNulls); + break; + default: + VELOX_UNSUPPORTED("Unsupported filter."); + } + readOffset_ += rows.back() + 1; } private: - TimestampPrecision sourcePrecision_; + TimestampPrecision parquetTimestampPrecision_; + TimestampPrecision timestampPrecision_; }; } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 20c4cb7bd1d7..62c0d57978ef 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -921,8 +921,8 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { TEST_F(ParquetTableScanTest, timestampINT64millis) { testTimestampINT64( "int64_millis_dictionary.parquet", TimestampPrecision::kMilliseconds); - //testTimestampINT64( - // "int64_millis_plain.parquet", TimestampPrecision::kMilliseconds); + testTimestampINT64( + "int64_millis_plain.parquet", TimestampPrecision::kMilliseconds); } TEST_F(ParquetTableScanTest, timestampINT64micros) { @@ -932,6 +932,78 @@ TEST_F(ParquetTableScanTest, timestampINT64micros) { "int64_micros_plain.parquet", TimestampPrecision::kMicroseconds); } +TEST_F(ParquetTableScanTest, timestampINT64Filter) { + std::vector rawDataMillis = { + Timestamp(0, 0), + Timestamp(0, 1000000), + Timestamp(-1, 999000000), + Timestamp(1, 0), + Timestamp(-1, 0), + Timestamp(1, 1000000), + Timestamp(-2, 999000000), + Timestamp(0, 999000000), + Timestamp(-1, 1000000), + Timestamp(1000, 0), + Timestamp(-1000, 0), + Timestamp(1000, 1000000), + Timestamp(-1001, 999000000), + Timestamp(99, 999000000), + Timestamp(-100, 1000000)}; + + auto a = makeFlatVector( + 60, [&](auto row) { return rawDataMillis[row / 4]; }); + + auto expected = makeRowVector({"time"}, {a}); + createDuckDbTable("expected", {expected}); + + auto vector = makeArrayVector({{}}); + + loadData( + getExampleFilePath("int64_millis_dictionary.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + + assertSelectWithFilter( + {"time"}, + {}, + "time < TIMESTAMP '1970-01-01 00:00:00'", + "SELECT time from expected where time < TIMESTAMP '1970-01-01 00:00:00'"); + + assertSelectWithFilter( + {"time"}, + {}, + "time <= TIMESTAMP '1970-01-01 00:00:00'", + "SELECT time from expected where time <= TIMESTAMP '1970-01-01 00:00:00'"); + + assertSelectWithFilter( + {"time"}, + {}, + "time > TIMESTAMP '1970-01-01 00:00:00'", + "SELECT time from expected where time > TIMESTAMP '1970-01-01 00:00:00'"); + + assertSelectWithFilter( + {"time"}, + {}, + "time >= TIMESTAMP '1970-01-01 00:00:00'", + "SELECT time from expected where time >= TIMESTAMP '1970-01-01 00:00:00'"); + + assertSelectWithFilter( + {"time"}, + {}, + "time == TIMESTAMP '1970-01-01 00:00:00'", + "SELECT time from expected where time == TIMESTAMP '1970-01-01 00:00:00'"); + + assertSelectWithFilter( + {"time"}, + {}, + "time != TIMESTAMP '1970-01-01 00:00:00'", + "SELECT time from expected where time != TIMESTAMP '1970-01-01 00:00:00'"); +} + TEST_F(ParquetTableScanTest, timestampINT64BackwardCompatible) { auto a = makeFlatVector( 3, [](auto row) { return Timestamp(0, 10 * 1000000L); }); From a2f6ade23cd37267a3e8d40f532dfb88c6e01120 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Wed, 28 Aug 2024 17:10:37 +0530 Subject: [PATCH 4/6] PR Comment Add E2E test --- .../common/tests/utils/DataSetBuilder.cpp | 30 +++++++ .../dwio/common/tests/utils/DataSetBuilder.h | 5 ++ .../parquet/reader/ParquetColumnReader.cpp | 4 +- velox/dwio/parquet/reader/ParquetReader.cpp | 60 +++++++------ .../parquet/reader/TimestampColumnReader.h | 90 ++++++++----------- .../parquet/tests/reader/E2EFilterTest.cpp | 76 ++++++++++++++++ velox/type/Timestamp.h | 13 +++ 7 files changed, 196 insertions(+), 82 deletions(-) diff --git a/velox/dwio/common/tests/utils/DataSetBuilder.cpp b/velox/dwio/common/tests/utils/DataSetBuilder.cpp index 07b6b245003f..d42d8033677e 100644 --- a/velox/dwio/common/tests/utils/DataSetBuilder.cpp +++ b/velox/dwio/common/tests/utils/DataSetBuilder.cpp @@ -316,6 +316,36 @@ DataSetBuilder& DataSetBuilder::makeMapStringValues( return *this; } +void DataSetBuilder::adjustTimestampToPrecision( + VectorPtr batch, + TimestampPrecision precision) { + auto type = batch->type(); + + if (type->kind() == TypeKind::TIMESTAMP) { + auto rawValues = + batch->asUnchecked>()->mutableRawValues(); + for (auto i = 0; i < batch->size(); ++i) { + if (batch->isNullAt(i)) { + continue; + } + + rawValues[i].toPrecision(precision); + } + } else if (type->kind() == TypeKind::ROW) { + for (auto& child : batch->as()->children()) { + adjustTimestampToPrecision(child, precision); + } + } +} + +DataSetBuilder& DataSetBuilder::adjustTimestampToPrecision( + TimestampPrecision precision) { + for (auto& batch : *batches_) { + adjustTimestampToPrecision(batch, precision); + } + return *this; +} + std::unique_ptr> DataSetBuilder::build() { return std::move(batches_); } diff --git a/velox/dwio/common/tests/utils/DataSetBuilder.h b/velox/dwio/common/tests/utils/DataSetBuilder.h index 4893c28336f6..0ffb7e017b32 100644 --- a/velox/dwio/common/tests/utils/DataSetBuilder.h +++ b/velox/dwio/common/tests/utils/DataSetBuilder.h @@ -50,6 +50,11 @@ class DataSetBuilder { // groups. Tests skipping row groups based on row group stats. DataSetBuilder& withRowGroupSpecificData(int32_t numRowsPerGroup); + DataSetBuilder& adjustTimestampToPrecision(TimestampPrecision precision); + void adjustTimestampToPrecision( + VectorPtr batch, + TimestampPrecision precision); + // Makes all data in 'batches_' after firstRow non-null. This finds a sampling // of non-null values from each column and replaces nulls in the column in // question with one of these. A column where only nulls are found in sampling diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index 0266ac568a5a..641c7147da25 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -81,10 +81,10 @@ std::unique_ptr ParquetColumnReader::build( if (logicalTypeOpt.has_value() && logicalTypeOpt.value().__isset.TIMESTAMP && parquetFileType.parquetType_ == thrift::Type::INT64) { - return std::make_unique( + return std::make_unique( requestedType, fileType, params, scanSpec); } else { - return std::make_unique( + return std::make_unique( requestedType, fileType, params, scanSpec); } } diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 98cc38650e58..77f44a985896 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -105,6 +105,11 @@ class ReaderBase { TypePtr convertType( const thrift::SchemaElement& schemaElement, const TypePtr& requestedType) const; + + thrift::LogicalType getTimestampLogicalType( + thrift::ConvertedType::type type) const; + + TypePtr convertType(const thrift::SchemaElement& schemaElement) const; template static std::shared_ptr createRowType( @@ -554,38 +559,14 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( int32_t typeLength = schemaElement.__isset.type_length ? schemaElement.type_length : 0; std::vector> children; - std::optional logicalType_ = + std::optional logicalType = schemaElement.__isset.logicalType ? std::optional(schemaElement.logicalType) : std::nullopt; if (veloxType->kind() == TypeKind::TIMESTAMP && - schemaElement.type == thrift::Type::INT64 && - !logicalType_.has_value()) { - // Construct logical type from deprecated converted type of Parquet. - thrift::TimestampType timestamp; - timestamp.__set_isAdjustedToUTC(true); - thrift::TimeUnit unit; - - if (schemaElement.converted_type == - thrift::ConvertedType::TIMESTAMP_MICROS) { - thrift::MicroSeconds micros; - unit.__set_MICROS(micros); - } else if ( - schemaElement.converted_type == - thrift::ConvertedType::TIMESTAMP_MILLIS) { - thrift::MilliSeconds millis; - unit.__set_MILLIS(millis); - } else { - VELOX_NYI( - "{} Timestamp unit not supported.", schemaElement.converted_type); - } - - timestamp.__set_unit(unit); - thrift::LogicalType newLogicalType; - newLogicalType.__set_TIMESTAMP(timestamp); - - logicalType_ = std::optional(newLogicalType); + schemaElement.type == thrift::Type::INT64 && !logicalType.has_value()) { + logicalType = getTimestampLogicalType(schemaElement.converted_type); } auto leafTypePtr = std::make_unique( @@ -596,7 +577,7 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( columnIdx++, name, schemaElement.type, - logicalType_, + logicalType, maxRepeat, maxDefine, isOptional, @@ -632,6 +613,29 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( return nullptr; } +thrift::LogicalType ReaderBase::getTimestampLogicalType( + thrift::ConvertedType::type convertedType) const { + thrift::TimestampType timestamp; + timestamp.__set_isAdjustedToUTC(true); + thrift::TimeUnit unit; + + if (convertedType == thrift::ConvertedType::TIMESTAMP_MICROS) { + thrift::MicroSeconds micros; + unit.__set_MICROS(micros); + } else if (convertedType == thrift::ConvertedType::TIMESTAMP_MILLIS) { + thrift::MilliSeconds millis; + unit.__set_MILLIS(millis); + } else { + VELOX_NYI("{} Timestamp unit not supported.", convertedType); + } + + timestamp.__set_unit(unit); + thrift::LogicalType logicalType; + logicalType.__set_TIMESTAMP(timestamp); + + return logicalType; +} + TypePtr ReaderBase::convertType( const thrift::SchemaElement& schemaElement, const TypePtr& requestedType) const { diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 58f36ea73d35..80c7cf215b75 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -21,9 +21,9 @@ namespace facebook::velox::parquet { -class TimestampINT96ColumnReader : public IntegerColumnReader { +class TimestampInt96ColumnReader : public IntegerColumnReader { public: - TimestampINT96ColumnReader( + TimestampInt96ColumnReader( const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, @@ -49,19 +49,7 @@ class TimestampINT96ColumnReader : public IntegerColumnReader { if (resultVector->isNullAt(i)) { continue; } - const auto timestamp = rawValues[i]; - uint64_t nanos = timestamp.getNanos(); - switch (timestampPrecision_) { - case TimestampPrecision::kMilliseconds: - nanos = nanos / 1'000'000 * 1'000'000; - break; - case TimestampPrecision::kMicroseconds: - nanos = nanos / 1'000 * 1'000; - break; - case TimestampPrecision::kNanoseconds: - break; - } - rawValues[i] = Timestamp(timestamp.getSeconds(), nanos); + rawValues[i].toPrecision(timestampPrecision_); } } @@ -82,9 +70,9 @@ class TimestampINT96ColumnReader : public IntegerColumnReader { TimestampPrecision timestampPrecision_; }; -class TimestampINT64ColumnReader : public IntegerColumnReader { +class TimestampInt64ColumnReader : public IntegerColumnReader { public: - TimestampINT64ColumnReader( + TimestampInt64ColumnReader( const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, @@ -112,7 +100,7 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { } bool hasBulkPath() const override { - return false; + return true; } void @@ -180,42 +168,23 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { getFlatValues(rows, result, requestedType_); } - Timestamp adjustToPrecision( - const Timestamp& timestamp, - TimestampPrecision precision) { - auto nano = timestamp.getNanos(); - switch (precision) { - case TimestampPrecision::kMilliseconds: - nano = nano / 1'000'000 * 1'000'000; - break; - case TimestampPrecision::kMicroseconds: - nano = nano / 1'000 * 1'000; - break; - case TimestampPrecision::kNanoseconds: - break; - } - - return Timestamp(timestamp.getSeconds(), nano); - } - - void read( - vector_size_t offset, - RowSet rows, - const uint64_t* /*incomingNulls*/) override { - auto& data = formatData_->as(); - prepareRead(offset, rows, nullptr); - - // Remove filter so that we can do filtering here once data is represented - // as timestamp type - const auto filter = scanSpec_->filter()->clone(); - scanSpec_->setFilter(nullptr); - - readCommon(rows); + template + void readHelper(common::Filter* filter, RowSet rows) { + dwio::common::ExtractToReader extractValues(this); + common::AlwaysTrue alwaysTrue; + dwio::common::ColumnVisitor< + int64_t, + common::AlwaysTrue, + decltype(extractValues), + isDense> + visitor(alwaysTrue, this, rows, extractValues); + readWithVisitor(rows, visitor); auto tsValues = AlignedBuffer::allocate(numValues_, &memoryPool_); auto rawTs = tsValues->asMutable(); auto rawTsInt64 = values_->asMutable(); + const auto rawNulls = resultNulls() ? resultNulls()->as() : nullptr; @@ -227,12 +196,14 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { } else { timestamp = Timestamp::fromMillis(rawTsInt64[i]); } - - rawTs[i] = adjustToPrecision(timestamp, timestampPrecision_); + rawTs[i] = timestamp; + rawTs[i].toPrecision(timestampPrecision_); } } + values_ = tsValues; rawValues_ = values_->asMutable(); + outputRows_.clear(); switch ( !filter || @@ -253,11 +224,26 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { break; case common::FilterKind::kTimestampRange: case common::FilterKind::kMultiRange: - processFilter(filter.get(), rows, rawNulls); + processFilter(filter, rows, rawNulls); break; default: VELOX_UNSUPPORTED("Unsupported filter."); } + } + + void read( + vector_size_t offset, + RowSet rows, + const uint64_t* /*incomingNulls*/) override { + auto& data = formatData_->as(); + prepareRead(offset, rows, nullptr); + + bool isDense = rows.back() == rows.size() - 1; + if (isDense) { + readHelper(scanSpec_->filter(), rows); + } else { + readHelper(scanSpec_->filter(), rows); + } readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 0b27395f9a00..d248f1de6294 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -269,6 +269,82 @@ TEST_F(E2EFilterTest, timestampDictionary) { 20); } +TEST_F(E2EFilterTest, timestampINT64MillisDictionary) { + options_.enableDictionary = true; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMilli); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMilliseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + +TEST_F(E2EFilterTest, timestampINT64MillisPlain) { + options_.enableDictionary = false; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMilli); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMilliseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + +TEST_F(E2EFilterTest, timestampINT64MicrosDictionary) { + options_.enableDictionary = true; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMicro); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMicroseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + +TEST_F(E2EFilterTest, timestampINT64MicrosPlain) { + options_.enableDictionary = false; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMicro); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMicroseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + TEST_F(E2EFilterTest, floatAndDoubleDirect) { options_.enableDictionary = false; options_.dataPageSize = 4 * 1024; diff --git a/velox/type/Timestamp.h b/velox/type/Timestamp.h index 44117bc7886b..57cec6ca5f3b 100644 --- a/velox/type/Timestamp.h +++ b/velox/type/Timestamp.h @@ -195,6 +195,19 @@ struct Timestamp { } } + void toPrecision(const TimestampPrecision& precision) { + switch (precision) { + case TimestampPrecision::kMilliseconds: + nanos_ = nanos_ / 1'000'000 * 1'000'000; + break; + case TimestampPrecision::kMicroseconds: + nanos_ = nanos_ / 1'000 * 1'000; + break; + case TimestampPrecision::kNanoseconds: + break; + } + } + /// Exports the current timestamp as a std::chrono::time_point of millisecond /// precision. Note that the conversion may overflow since the internal /// `seconds_` value will need to be multiplied by 1000. From b029524b85253de943265bc7764295a1e8291569 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Wed, 4 Sep 2024 17:24:55 +0530 Subject: [PATCH 5/6] Fix rebase --- velox/dwio/parquet/tests/reader/E2EFilterTest.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index d248f1de6294..f50eaee29799 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -273,7 +273,7 @@ TEST_F(E2EFilterTest, timestampINT64MillisDictionary) { options_.enableDictionary = true; options_.dataPageSize = 4 * 1024; options_.parquetWriteTimestampUnit = - static_cast(TimestampUnit::kMilli); + std::optional(TimestampUnit::kMilli); options_.parquetWriteTimestampTimeZone = "utc"; testWithTypes( @@ -292,7 +292,7 @@ TEST_F(E2EFilterTest, timestampINT64MillisPlain) { options_.enableDictionary = false; options_.dataPageSize = 4 * 1024; options_.parquetWriteTimestampUnit = - static_cast(TimestampUnit::kMilli); + std::optional(TimestampUnit::kMilli); options_.parquetWriteTimestampTimeZone = "utc"; testWithTypes( @@ -311,7 +311,7 @@ TEST_F(E2EFilterTest, timestampINT64MicrosDictionary) { options_.enableDictionary = true; options_.dataPageSize = 4 * 1024; options_.parquetWriteTimestampUnit = - static_cast(TimestampUnit::kMicro); + std::optional(TimestampUnit::kMicro); options_.parquetWriteTimestampTimeZone = "utc"; testWithTypes( @@ -330,7 +330,7 @@ TEST_F(E2EFilterTest, timestampINT64MicrosPlain) { options_.enableDictionary = false; options_.dataPageSize = 4 * 1024; options_.parquetWriteTimestampUnit = - static_cast(TimestampUnit::kMicro); + std::optional(TimestampUnit::kMicro); options_.parquetWriteTimestampTimeZone = "utc"; testWithTypes( From 187e2c512a677efa7b2a3c1b96dabc394ae86ce1 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Wed, 4 Sep 2024 17:36:20 +0530 Subject: [PATCH 6/6] Remove extra code --- velox/dwio/parquet/reader/ParquetReader.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 77f44a985896..9d2accb7ea94 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -109,8 +109,6 @@ class ReaderBase { thrift::LogicalType getTimestampLogicalType( thrift::ConvertedType::type type) const; - TypePtr convertType(const thrift::SchemaElement& schemaElement) const; - template static std::shared_ptr createRowType( const std::vector& children,