Skip to content

Commit

Permalink
Fix Parquet Complex type handling
Browse files Browse the repository at this point in the history
Co-authored-by: hitarth <[email protected]>
  • Loading branch information
jaystarshot and hitarth committed Apr 11, 2024
1 parent e4f74d3 commit dd3aff6
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 29 deletions.
39 changes: 33 additions & 6 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
auto& schema = fileMetaData_->schema;
uint32_t curSchemaIdx = schemaIdx;
auto& schemaElement = schema[curSchemaIdx];
bool isRepeated = false;
bool isOptional = false;

if (schemaElement.__isset.repetition_type) {
if (schemaElement.repetition_type !=
Expand All @@ -244,6 +246,11 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
if (schemaElement.repetition_type ==
thrift::FieldRepetitionType::REPEATED) {
maxRepeat++;
isRepeated = true;
}
if (schemaElement.repetition_type ==
thrift::FieldRepetitionType::OPTIONAL) {
isOptional = true;
}
}

Expand Down Expand Up @@ -296,7 +303,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}

// For backward-compatibility, a group annotated with MAP_KEY_VALUE
Expand All @@ -309,6 +318,12 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
VELOX_CHECK_EQ(children.size(), 1);
const auto& child = children[0];
auto type = child->type();
isRepeated = true;
// This level will not have the "isRepeated" info in the parquet
// schema since parquet schema will have a child layer which will have
// the "repeated info" which we are ignoring here, hence we set the
// isRepeated to true eg
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
return std::make_unique<ParquetTypeWithId>(
std::move(type),
std::move(*(ParquetTypeWithId*)child.get()).moveChildren(),
Expand All @@ -319,7 +334,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat + 1,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}

default:
Expand All @@ -346,7 +363,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
} else if (children.size() == 2) {
// children of MAP
auto type = TypeFactory<TypeKind::MAP>::create(
Expand All @@ -361,7 +380,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}
} else {
// Row type
Expand All @@ -376,7 +397,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}
}
} else { // leaf node
Expand All @@ -402,6 +425,8 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
logicalType_,
maxRepeat,
maxDefine,
isOptional,
isRepeated,
precision,
scale,
type_length);
Expand All @@ -422,7 +447,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine - 1);
maxDefine - 1,
isOptional,
isRepeated);
}
return leafTypePtr;
}
Expand Down
19 changes: 11 additions & 8 deletions velox/dwio/parquet/reader/ParquetTypeWithId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ ParquetTypeWithId::moveChildren() && {
auto logicalType = parquetChild->logicalType_;
auto maxRepeat = parquetChild->maxRepeat_;
auto maxDefine = parquetChild->maxDefine_;
auto isOptional = parquetChild->isOptional_;
auto isRepeated = parquetChild->isRepeated_;
auto precision = parquetChild->precision_;
auto scale = parquetChild->scale_;
auto typeLength = parquetChild->typeLength_;
Expand All @@ -62,6 +64,8 @@ ParquetTypeWithId::moveChildren() && {
std::move(logicalType),
maxRepeat,
maxDefine,
isOptional,
isRepeated,
precision,
scale,
typeLength));
Expand All @@ -86,15 +90,14 @@ bool ParquetTypeWithId::hasNonRepeatedLeaf() const {
}

LevelMode ParquetTypeWithId::makeLevelInfo(LevelInfo& info) const {
int16_t repeatedAncestor = 0;
for (auto parent = parquetParent(); parent;
parent = parent->parquetParent()) {
if (parent->type()->kind() == TypeKind::ARRAY ||
parent->type()->kind() == TypeKind::MAP) {
repeatedAncestor = parent->maxDefine_;
break;
int16_t repeatedAncestor = maxDefine_;
auto node = this;
do {
if (node->isOptional_) {
repeatedAncestor--;
}
}
node = node->parquetParent();
} while (node && !node->isRepeated_);
bool isList = type()->kind() == TypeKind::ARRAY;
bool isStruct = type()->kind() == TypeKind::ROW;
bool isMap = type()->kind() == TypeKind::MAP;
Expand Down
6 changes: 6 additions & 0 deletions velox/dwio/parquet/reader/ParquetTypeWithId.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
std::optional<thrift::LogicalType> logicalType,
uint32_t maxRepeat,
uint32_t maxDefine,
bool isOptional,
bool isRepeated,
int32_t precision = 0,
int32_t scale = 0,
int32_t typeLength = 0)
Expand All @@ -54,6 +56,8 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
logicalType_(std::move(logicalType)),
maxRepeat_(maxRepeat),
maxDefine_(maxDefine),
isOptional_(isOptional),
isRepeated_(isRepeated),
precision_(precision),
scale_(scale),
typeLength_(typeLength) {}
Expand Down Expand Up @@ -81,6 +85,8 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
const std::optional<thrift::LogicalType> logicalType_;
const uint32_t maxRepeat_;
const uint32_t maxDefine_;
const bool isOptional_;
const bool isRepeated_;
const int32_t precision_;
const int32_t scale_;
const int32_t typeLength_;
Expand Down
Binary file not shown.
56 changes: 41 additions & 15 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
createDuckDbTable({data});
}

void loadDataWithRowType(const std::string& filePath, RowVectorPtr data) {
splits_ = {makeSplit(filePath)};
auto pool = facebook::velox::memory::memoryManager()->addLeafPool();
dwio::common::ReaderOptions readerOpts{pool.get()};
auto reader = std::make_unique<ParquetReader>(
std::make_unique<facebook::velox::dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(filePath),
readerOpts.getMemoryPool()),
readerOpts);
rowType_ = reader->rowType();
createDuckDbTable({data});
}

std::string getExampleFilePath(const std::string& fileName) {
return facebook::velox::test::getDataFilePath(
"velox/dwio/parquet/tests/reader", "../examples/" + fileName);
Expand Down Expand Up @@ -303,9 +316,8 @@ TEST_F(ParquetTableScanTest, singleRowStruct) {
}

// Core dump and incorrect result are fixed.
TEST_F(ParquetTableScanTest, DISABLED_array) {
auto vector = makeArrayVector<int32_t>({{1, 2, 3}});

TEST_F(ParquetTableScanTest, array) {
auto vector = makeArrayVector<int32_t>({});
loadData(
getExampleFilePath("old_repeated_int.parquet"),
ROW({"repeatedInt"}, {ARRAY(INTEGER())}),
Expand All @@ -316,12 +328,11 @@ TEST_F(ParquetTableScanTest, DISABLED_array) {
}));

assertSelectWithFilter(
{"repeatedInt"}, {}, "", "SELECT repeatedInt FROM tmp");
{"repeatedInt"}, {}, "", "SELECT UNNEST(array[array[1,2,3]])");
}

// Optional array with required elements.
// Incorrect result.
TEST_F(ParquetTableScanTest, DISABLED_optArrayReqEle) {
TEST_F(ParquetTableScanTest, optArrayReqEle) {
auto vector = makeArrayVector<StringView>({});

loadData(
Expand All @@ -341,8 +352,7 @@ TEST_F(ParquetTableScanTest, DISABLED_optArrayReqEle) {
}

// Required array with required elements.
// Core dump is fixed, but the result is incorrect.
TEST_F(ParquetTableScanTest, DISABLED_reqArrayReqEle) {
TEST_F(ParquetTableScanTest, reqArrayReqEle) {
auto vector = makeArrayVector<StringView>({});

loadData(
Expand All @@ -362,8 +372,7 @@ TEST_F(ParquetTableScanTest, DISABLED_reqArrayReqEle) {
}

// Required array with optional elements.
// Incorrect result.
TEST_F(ParquetTableScanTest, DISABLED_reqArrayOptEle) {
TEST_F(ParquetTableScanTest, reqArrayOptEle) {
auto vector = makeArrayVector<StringView>({});

loadData(
Expand All @@ -382,22 +391,39 @@ TEST_F(ParquetTableScanTest, DISABLED_reqArrayOptEle) {
"SELECT UNNEST(array[array['a', null], array[], array[null, 'b']])");
}

TEST_F(ParquetTableScanTest, arrayOfArrayTest) {
auto vector = makeArrayVector<StringView>({});

loadDataWithRowType(
getExampleFilePath("array_of_array1.parquet"),
makeRowVector(
{"_1"},
{
vector,
}));

assertSelectWithFilter(
{"_1"},
{},
"",
"SELECT UNNEST(array[null, array[array['g', 'h'], null]])");
}

// Required array with legacy format.
// Incorrect result.
TEST_F(ParquetTableScanTest, DISABLED_reqArrayLegacy) {
TEST_F(ParquetTableScanTest, reqArrayLegacy) {
auto vector = makeArrayVector<StringView>({});

loadData(
getExampleFilePath("array_3.parquet"),
ROW({"_1"}, {ARRAY(VARCHAR())}),
ROW({"element"}, {ARRAY(VARCHAR())}),
makeRowVector(
{"_1"},
{"element"},
{
vector,
}));

assertSelectWithFilter(
{"_1"},
{"element"},
{},
"",
"SELECT UNNEST(array[array['a', 'b'], array[], array['c', 'd']])");
Expand Down

0 comments on commit dd3aff6

Please sign in to comment.