Skip to content

Commit

Permalink
PR Comment
Browse files Browse the repository at this point in the history
Add E2E test
  • Loading branch information
mskapilks committed Sep 4, 2024
1 parent f4764d3 commit a2f6ade
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 82 deletions.
30 changes: 30 additions & 0 deletions velox/dwio/common/tests/utils/DataSetBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,36 @@ DataSetBuilder& DataSetBuilder::makeMapStringValues(
return *this;
}

void DataSetBuilder::adjustTimestampToPrecision(
VectorPtr batch,
TimestampPrecision precision) {
auto type = batch->type();

if (type->kind() == TypeKind::TIMESTAMP) {
auto rawValues =
batch->asUnchecked<FlatVector<Timestamp>>()->mutableRawValues();
for (auto i = 0; i < batch->size(); ++i) {
if (batch->isNullAt(i)) {
continue;
}

rawValues[i].toPrecision(precision);
}
} else if (type->kind() == TypeKind::ROW) {
for (auto& child : batch->as<RowVector>()->children()) {
adjustTimestampToPrecision(child, precision);
}
}
}

DataSetBuilder& DataSetBuilder::adjustTimestampToPrecision(
TimestampPrecision precision) {
for (auto& batch : *batches_) {
adjustTimestampToPrecision(batch, precision);
}
return *this;
}

std::unique_ptr<std::vector<RowVectorPtr>> DataSetBuilder::build() {
return std::move(batches_);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/common/tests/utils/DataSetBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class DataSetBuilder {
// groups. Tests skipping row groups based on row group stats.
DataSetBuilder& withRowGroupSpecificData(int32_t numRowsPerGroup);

DataSetBuilder& adjustTimestampToPrecision(TimestampPrecision precision);
void adjustTimestampToPrecision(
VectorPtr batch,
TimestampPrecision precision);

// Makes all data in 'batches_' after firstRow non-null. This finds a sampling
// of non-null values from each column and replaces nulls in the column in
// question with one of these. A column where only nulls are found in sampling
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
if (logicalTypeOpt.has_value() &&
logicalTypeOpt.value().__isset.TIMESTAMP &&
parquetFileType.parquetType_ == thrift::Type::INT64) {
return std::make_unique<TimestampINT64ColumnReader>(
return std::make_unique<TimestampInt64ColumnReader>(
requestedType, fileType, params, scanSpec);
} else {
return std::make_unique<TimestampINT96ColumnReader>(
return std::make_unique<TimestampInt96ColumnReader>(
requestedType, fileType, params, scanSpec);
}
}
Expand Down
60 changes: 32 additions & 28 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ class ReaderBase {
TypePtr convertType(
const thrift::SchemaElement& schemaElement,
const TypePtr& requestedType) const;

thrift::LogicalType getTimestampLogicalType(
thrift::ConvertedType::type type) const;

TypePtr convertType(const thrift::SchemaElement& schemaElement) const;

template <typename T>
static std::shared_ptr<const RowType> createRowType(
Expand Down Expand Up @@ -554,38 +559,14 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
int32_t typeLength =
schemaElement.__isset.type_length ? schemaElement.type_length : 0;
std::vector<std::unique_ptr<dwio::common::TypeWithId>> children;
std::optional<thrift::LogicalType> logicalType_ =
std::optional<thrift::LogicalType> logicalType =
schemaElement.__isset.logicalType
? std::optional<thrift::LogicalType>(schemaElement.logicalType)
: std::nullopt;

if (veloxType->kind() == TypeKind::TIMESTAMP &&
schemaElement.type == thrift::Type::INT64 &&
!logicalType_.has_value()) {
// Construct logical type from deprecated converted type of Parquet.
thrift::TimestampType timestamp;
timestamp.__set_isAdjustedToUTC(true);
thrift::TimeUnit unit;

if (schemaElement.converted_type ==
thrift::ConvertedType::TIMESTAMP_MICROS) {
thrift::MicroSeconds micros;
unit.__set_MICROS(micros);
} else if (
schemaElement.converted_type ==
thrift::ConvertedType::TIMESTAMP_MILLIS) {
thrift::MilliSeconds millis;
unit.__set_MILLIS(millis);
} else {
VELOX_NYI(
"{} Timestamp unit not supported.", schemaElement.converted_type);
}

timestamp.__set_unit(unit);
thrift::LogicalType newLogicalType;
newLogicalType.__set_TIMESTAMP(timestamp);

logicalType_ = std::optional<thrift::LogicalType>(newLogicalType);
schemaElement.type == thrift::Type::INT64 && !logicalType.has_value()) {
logicalType = getTimestampLogicalType(schemaElement.converted_type);
}

auto leafTypePtr = std::make_unique<ParquetTypeWithId>(
Expand All @@ -596,7 +577,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
columnIdx++,
name,
schemaElement.type,
logicalType_,
logicalType,
maxRepeat,
maxDefine,
isOptional,
Expand Down Expand Up @@ -632,6 +613,29 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
return nullptr;
}

thrift::LogicalType ReaderBase::getTimestampLogicalType(
thrift::ConvertedType::type convertedType) const {
thrift::TimestampType timestamp;
timestamp.__set_isAdjustedToUTC(true);
thrift::TimeUnit unit;

if (convertedType == thrift::ConvertedType::TIMESTAMP_MICROS) {
thrift::MicroSeconds micros;
unit.__set_MICROS(micros);
} else if (convertedType == thrift::ConvertedType::TIMESTAMP_MILLIS) {
thrift::MilliSeconds millis;
unit.__set_MILLIS(millis);
} else {
VELOX_NYI("{} Timestamp unit not supported.", convertedType);
}

timestamp.__set_unit(unit);
thrift::LogicalType logicalType;
logicalType.__set_TIMESTAMP(timestamp);

return logicalType;
}

TypePtr ReaderBase::convertType(
const thrift::SchemaElement& schemaElement,
const TypePtr& requestedType) const {
Expand Down
90 changes: 38 additions & 52 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace facebook::velox::parquet {

class TimestampINT96ColumnReader : public IntegerColumnReader {
class TimestampInt96ColumnReader : public IntegerColumnReader {
public:
TimestampINT96ColumnReader(
TimestampInt96ColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
Expand All @@ -49,19 +49,7 @@ class TimestampINT96ColumnReader : public IntegerColumnReader {
if (resultVector->isNullAt(i)) {
continue;
}
const auto timestamp = rawValues[i];
uint64_t nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
rawValues[i].toPrecision(timestampPrecision_);
}
}

Expand All @@ -82,9 +70,9 @@ class TimestampINT96ColumnReader : public IntegerColumnReader {
TimestampPrecision timestampPrecision_;
};

class TimestampINT64ColumnReader : public IntegerColumnReader {
class TimestampInt64ColumnReader : public IntegerColumnReader {
public:
TimestampINT64ColumnReader(
TimestampInt64ColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
Expand Down Expand Up @@ -112,7 +100,7 @@ class TimestampINT64ColumnReader : public IntegerColumnReader {
}

bool hasBulkPath() const override {
return false;
return true;
}

void
Expand Down Expand Up @@ -180,42 +168,23 @@ class TimestampINT64ColumnReader : public IntegerColumnReader {
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType_);
}

Timestamp adjustToPrecision(
const Timestamp& timestamp,
TimestampPrecision precision) {
auto nano = timestamp.getNanos();
switch (precision) {
case TimestampPrecision::kMilliseconds:
nano = nano / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nano = nano / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}

return Timestamp(timestamp.getSeconds(), nano);
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
prepareRead<int64_t>(offset, rows, nullptr);

// Remove filter so that we can do filtering here once data is represented
// as timestamp type
const auto filter = scanSpec_->filter()->clone();
scanSpec_->setFilter(nullptr);

readCommon<IntegerColumnReader, true>(rows);
template <bool isDense>
void readHelper(common::Filter* filter, RowSet rows) {
dwio::common::ExtractToReader extractValues(this);
common::AlwaysTrue alwaysTrue;
dwio::common::ColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
isDense>
visitor(alwaysTrue, this, rows, extractValues);
readWithVisitor(rows, visitor);

auto tsValues =
AlignedBuffer::allocate<Timestamp>(numValues_, &memoryPool_);
auto rawTs = tsValues->asMutable<Timestamp>();
auto rawTsInt64 = values_->asMutable<int64_t>();

const auto rawNulls =
resultNulls() ? resultNulls()->as<uint64_t>() : nullptr;

Expand All @@ -227,12 +196,14 @@ class TimestampINT64ColumnReader : public IntegerColumnReader {
} else {
timestamp = Timestamp::fromMillis(rawTsInt64[i]);
}

rawTs[i] = adjustToPrecision(timestamp, timestampPrecision_);
rawTs[i] = timestamp;
rawTs[i].toPrecision(timestampPrecision_);
}
}

values_ = tsValues;
rawValues_ = values_->asMutable<char>();
outputRows_.clear();

switch (
!filter ||
Expand All @@ -253,11 +224,26 @@ class TimestampINT64ColumnReader : public IntegerColumnReader {
break;
case common::FilterKind::kTimestampRange:
case common::FilterKind::kMultiRange:
processFilter(filter.get(), rows, rawNulls);
processFilter(filter, rows, rawNulls);
break;
default:
VELOX_UNSUPPORTED("Unsupported filter.");
}
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
prepareRead<int64_t>(offset, rows, nullptr);

bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(scanSpec_->filter(), rows);
}

readOffset_ += rows.back() + 1;
}
Expand Down
76 changes: 76 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,82 @@ TEST_F(E2EFilterTest, timestampDictionary) {
20);
}

TEST_F(E2EFilterTest, timestampINT64MillisDictionary) {
options_.enableDictionary = true;
options_.dataPageSize = 4 * 1024;
options_.parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMilli);
options_.parquetWriteTimestampTimeZone = "utc";

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {
dataSetBuilder_->adjustTimestampToPrecision(
TimestampPrecision::kMilliseconds);
},
true,
{"timestamp_val_0", "timestamp_val_1"},
1);
}

TEST_F(E2EFilterTest, timestampINT64MillisPlain) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
options_.parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMilli);
options_.parquetWriteTimestampTimeZone = "utc";

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {
dataSetBuilder_->adjustTimestampToPrecision(
TimestampPrecision::kMilliseconds);
},
true,
{"timestamp_val_0", "timestamp_val_1"},
1);
}

TEST_F(E2EFilterTest, timestampINT64MicrosDictionary) {
options_.enableDictionary = true;
options_.dataPageSize = 4 * 1024;
options_.parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMicro);
options_.parquetWriteTimestampTimeZone = "utc";

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {
dataSetBuilder_->adjustTimestampToPrecision(
TimestampPrecision::kMicroseconds);
},
true,
{"timestamp_val_0", "timestamp_val_1"},
1);
}

TEST_F(E2EFilterTest, timestampINT64MicrosPlain) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
options_.parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMicro);
options_.parquetWriteTimestampTimeZone = "utc";

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {
dataSetBuilder_->adjustTimestampToPrecision(
TimestampPrecision::kMicroseconds);
},
true,
{"timestamp_val_0", "timestamp_val_1"},
1);
}

TEST_F(E2EFilterTest, floatAndDoubleDirect) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
Expand Down
Loading

0 comments on commit a2f6ade

Please sign in to comment.