Skip to content

Commit

Permalink
Support timestamp reader
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Jul 2, 2024
1 parent 6caa7d5 commit 71c2518
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 4 deletions.
3 changes: 3 additions & 0 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ void SelectiveColumnReader::getIntValues(
VELOX_FAIL("Unsupported value size: {}", valueSize_);
}
break;
case TypeKind::TIMESTAMP:
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
break;
default:
VELOX_FAIL(
"Not a valid type for integer reader: {}", requestedType->toString());
Expand Down
46 changes: 45 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,49 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
uint64_t nanos;
memcpy(
&nanos,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(uint64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
sizeof(int32_t));
int64_t seconds = (days - Timestamp::kJulianToUnixEpochDays) *
Timestamp::kSecondsPerDay;
if (nanos > Timestamp::kMaxNanos) {
seconds += nanos / Timestamp::kNanosInSecond;
nanos -=
(nanos / Timestamp::kNanosInSecond) * Timestamp::kNanosInSecond;
}
values[i] = Timestamp(seconds, nanos);
}
break;
}
case thrift::Type::BYTE_ARRAY: {
dictionary_.values =
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
Expand Down Expand Up @@ -461,7 +504,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
}
case thrift::Type::INT96:
default:
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
Expand All @@ -488,6 +530,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
case thrift::Type::INT64:
case thrift::Type::DOUBLE:
return 8;
case thrift::Type::INT96:
return 12;
default:
VELOX_FAIL("Type does not have a byte width {}", type);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "velox/dwio/parquet/reader/RepeatedColumnReader.h"
#include "velox/dwio/parquet/reader/StringColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -74,6 +75,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
49 changes: 49 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec) {}

bool hasBulkPath() const override {
return false;
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
}
};

} // namespace facebook::velox::parquet
Binary file not shown.
106 changes: 106 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
assertQuery(plan, splits_, sql);
}

void assertSelectWithFilter(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& subfieldFilters,
const std::string& remainingFilter,
const std::string& sql,
bool isFilterPushdownEnabled) {
auto rowType = getRowType(std::move(outputColumnNames));
parse::ParseOptions options;
options.parseDecimalAsDouble = false;

auto plan = PlanBuilder(pool_.get())
.setParseOptions(options)
// Function extractFiltersFromRemainingFilter will extract
// filters to subfield filters, but for some types, filter
// pushdown is not supported.
.tableScan(
"hive_table",
rowType,
{},
subfieldFilters,
remainingFilter,
nullptr,
isFilterPushdownEnabled)
.planNode();

assertQuery(plan, splits_, sql);
}

void assertSelectWithAgg(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& aggregates,
Expand Down Expand Up @@ -674,6 +702,84 @@ TEST_F(ParquetTableScanTest, sessionTimezone) {
assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai");
}

TEST_F(ParquetTableScanTest, timestampFilter) {
// Timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433116800, 70496000000000),
Timestamp(1433203200, 70496000000000),
Timestamp(981158400, 12846000000000),
Timestamp(888710400, 28866000000000),
Timestamp(1671753600, 14161000000000),
Timestamp(317520000, 1387000000000),
Timestamp(944611200, 49166000000000),
Timestamp(1682035200, 32974000000000),
Timestamp(968716800, 81389000000000),
Timestamp(1197417600, 16076000000000)});

loadData(
getExampleFilePath("timestamp_int96.parquet"),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));

assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp", false);
assertSelectWithFilter(
{"t"},
{},
"t < TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t <= TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t > TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t >= TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t == TIMESTAMP '2022-12-23 03:56:01'",
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'",
false);
VELOX_ASSERT_THROW(
assertSelectWithFilter(
{"t"},
{"t < TIMESTAMP '2000-09-12 22:36:29'"},
"",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"),
"testInt128() is not supported");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ PlanBuilder& PlanBuilder::tableScan(
const RowTypePtr& dataColumns,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments) {
std::shared_ptr<connector::ColumnHandle>>& assignments,
bool isFilterPushdownEnabled) {
return TableScanBuilder(*this)
.tableName(tableName)
.outputType(outputType)
.columnAliases(columnAliases)
.filterPushdown(isFilterPushdownEnabled)
.subfieldFilters(subfieldFilters)
.remainingFilter(remainingFilter)
.dataColumns(dataColumns)
Expand Down Expand Up @@ -204,7 +206,7 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
tableHandle_ = std::make_shared<HiveTableHandle>(
connectorId_,
tableName_,
true,
isFilterPushdownEnabled_,
std::move(filters),
remainingFilterExpr,
dataColumns_);
Expand Down
10 changes: 9 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ class PlanBuilder {
const RowTypePtr& dataColumns = nullptr,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments = {});
std::shared_ptr<connector::ColumnHandle>>& assignments = {},
bool isFilterPushdownEnabled = true);

/// Add a TableScanNode to scan a TPC-H table.
///
Expand Down Expand Up @@ -187,6 +188,12 @@ class PlanBuilder {
return *this;
}

/// @param isFilterPushdownEnabled Whether filter push-down is enabled.
TableScanBuilder& filterPushdown(bool isFilterPushdownEnabled) {
isFilterPushdownEnabled_ = isFilterPushdownEnabled;
return *this;
}

/// @param outputType List of column names and types to read from the table.
/// This property is required.
TableScanBuilder& outputType(RowTypePtr outputType) {
Expand Down Expand Up @@ -273,6 +280,7 @@ class PlanBuilder {
PlanBuilder& planBuilder_;
std::string tableName_{"hive_table"};
std::string connectorId_{"test-hive"};
bool isFilterPushdownEnabled_ = true;
RowTypePtr outputType_;
std::vector<std::string> subfieldFilters_;
std::string remainingFilter_;
Expand Down
4 changes: 4 additions & 0 deletions velox/type/Timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ struct Timestamp {
static constexpr int64_t kMicrosecondsInMillisecond = 1'000;
static constexpr int64_t kNanosecondsInMicrosecond = 1'000;
static constexpr int64_t kNanosecondsInMillisecond = 1'000'000;
static constexpr int64_t kNanosInSecond =
kNanosecondsInMillisecond * kMillisecondsInSecond;
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
static constexpr int64_t kSecondsPerDay = 86400LL;

// Limit the range of seconds to avoid some problems. Seconds should be
// in the range [INT64_MIN/1000 - 1, INT64_MAX/1000].
Expand Down
5 changes: 5 additions & 0 deletions velox/type/Type.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ namespace facebook::velox {

using int128_t = __int128_t;

struct __attribute__((__packed__)) Int96Timestamp {
int32_t days;
uint64_t nanos;
};

/// Velox type system supports a small set of SQL-compatible composeable types:
/// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR,
/// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW
Expand Down

0 comments on commit 71c2518

Please sign in to comment.