Skip to content

Commit

Permalink
Fix reading MAP_KEY_VALUE Parquet SchemaElement
Browse files Browse the repository at this point in the history
In Parquet, the map type is annotated as MAP converted type. But
For backward-compatibility, a group annotated with MAP_KEY_VALUE
that is not contained by a MAP-annotated group should be handled as
a MAP-annotated group. The previous code had a couple of mistakes:

1. If a group is MAP_KEY_VALUE, its parent type needs to be checked
to see if it's a MAP group. But the code didn't check the parent.
This commit adds a parentSchemaIdx to Parquet reader's
getParquetColumnInfo() function to pass the parent schema.

2. The previous code mistakenly treated MAP_KEY_VALUE as a child of
MAP-annotated group, and thought it should be repeated and have 2
children. The fact is it can be optional instead of repeated, and
has only 1 child. This commit moves the handling of this type to be
before LIST and MAP types and takes the type of its only child, which
is a repeated group called "key_value".

For more information please check https://github.com/apache/parquet-
format/blob/master/LogicalTypes.md#maps
  • Loading branch information
yingsu00 committed Dec 13, 2023
1 parent 085c753 commit 6af7c51
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 28 deletions.
64 changes: 36 additions & 28 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ReaderBase {
uint32_t maxSchemaElementIdx,
uint32_t maxRepeat,
uint32_t maxDefine,
uint32_t parentSchemaIdx,
uint32_t& schemaIdx,
uint32_t& columnIdx) const;

Expand Down Expand Up @@ -213,7 +214,7 @@ void ReaderBase::initializeSchema() {
uint32_t columnIdx = 0;
uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1;
schemaWithId_ = getParquetColumnInfo(
maxSchemaElementIdx, maxRepeat, maxDefine, schemaIdx, columnIdx);
maxSchemaElementIdx, maxRepeat, maxDefine, -1, schemaIdx, columnIdx);
schema_ = createRowType(
schemaWithId_->getChildren(), isFileColumnNamesReadAsLowerCase());
}
Expand All @@ -222,6 +223,7 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
uint32_t maxSchemaElementIdx,
uint32_t maxRepeat,
uint32_t maxDefine,
uint32_t parentSchemaIdx,
uint32_t& schemaIdx,
uint32_t& columnIdx) const {
VELOX_CHECK(fileMetaData_ != nullptr);
Expand Down Expand Up @@ -253,22 +255,46 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(

std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>> children;

auto curSchemaIdx = schemaIdx;
for (int32_t i = 0; i < schemaElement.num_children; i++) {
auto child = getParquetColumnInfo(
maxSchemaElementIdx, maxRepeat, maxDefine, ++schemaIdx, columnIdx);
maxSchemaElementIdx,
maxRepeat,
maxDefine,
curSchemaIdx,
++schemaIdx,
columnIdx);
children.push_back(child);
}
VELOX_CHECK(!children.empty());

if (schemaElement.__isset.converted_type) {
switch (schemaElement.converted_type) {
case thrift::ConvertedType::MAP_KEY_VALUE:
// For backward-compatibility, a group annotated with MAP_KEY_VALUE
// that is not contained by a MAP-annotated group should be handled as
// a MAP-annotated group. But the spec didn't say if it's possible for
// a MAP schemaElement to actually contains a MAP_KEY_VALUE element.
// In this case we will throw an error.
if (parentSchemaIdx >= 0 &&
schema[parentSchemaIdx].converted_type ==
thrift::ConvertedType::MAP) {
VELOX_FAIL(
"Invalid Parquet file SchemaElement. Name: {} Converted type: {}, Parent converted type: {}",
schemaElement.name,
schemaElement.converted_type,
schema[parentSchemaIdx].converted_type);
FOLLY_FALLTHROUGH;
}

case thrift::ConvertedType::LIST:
case thrift::ConvertedType::MAP: {
auto element = children.at(0)->getChildren();
VELOX_CHECK_EQ(children.size(), 1);
auto child = children[0];
auto grandChildren = child->getChildren();
return std::make_shared<const ParquetTypeWithId>(
children[0]->type(),
std::move(element),
child->type(),
std::move(grandChildren),
curSchemaIdx, // TODO: there are holes in the ids
maxSchemaElementIdx,
ParquetTypeWithId::kNonLeaf, // columnIdx,
Expand All @@ -278,30 +304,12 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
maxRepeat + 1,
maxDefine);
}
case thrift::ConvertedType::MAP_KEY_VALUE: {
// child of MAP
VELOX_CHECK_EQ(
schemaElement.repetition_type,
thrift::FieldRepetitionType::REPEATED);
assert(children.size() == 2);
auto childrenCopy = children;
return std::make_shared<const ParquetTypeWithId>(
TypeFactory<TypeKind::MAP>::create(
children[0]->type(), children[1]->type()),
std::move(childrenCopy),
curSchemaIdx, // TODO: there are holes in the ids
maxSchemaElementIdx,
ParquetTypeWithId::kNonLeaf, // columnIdx,
std::move(name),
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
}

default:
VELOX_UNSUPPORTED(
"Unsupported SchemaElement type: {}",
schemaElement.converted_type);
VELOX_UNREACHABLE(
"Invalid SchemaElement converted_type: {}, name: {}",
schemaElement.converted_type,
schemaElement.name);
}
} else {
if (schemaElement.repetition_type ==
Expand Down
Binary file not shown.
45 changes: 45 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/vector/tests/utils/VectorMaker.h"

using namespace facebook::velox;
using namespace facebook::velox::common;
Expand Down Expand Up @@ -515,6 +516,50 @@ TEST_F(ParquetReaderTest, parseIntDecimal) {
}
}

TEST_F(ParquetReaderTest, parseMapKeyValueAsMap) {
// map_key_value.parquet holds a single map column (key: VARCHAR, b: BIGINT)
// and 1 row that contains 8 map entries. It is with older version of Parquet
// and uses MAP_KEY_VALUE instead of MAP as the map SchemaElement
// converted_type.
const std::string sample(getExampleFilePath("map_key_value.parquet"));

facebook::velox::dwio::common::ReaderOptions readerOptions{defaultPool.get()};
auto reader = createReader(sample, readerOptions);
EXPECT_EQ(reader->numberOfRows(), 1ULL);

auto rowType = reader->typeWithId();
EXPECT_EQ(rowType->type()->kind(), TypeKind::ROW);
EXPECT_EQ(rowType->size(), 1ULL);

auto mapColumnType = rowType->childAt(0);
EXPECT_EQ(mapColumnType->type()->kind(), TypeKind::MAP);

auto mapKeyType = mapColumnType->childAt(0);
EXPECT_EQ(mapKeyType->type()->kind(), TypeKind::VARCHAR);

auto mapValueType = mapColumnType->childAt(1);
EXPECT_EQ(mapValueType->type()->kind(), TypeKind::BIGINT);

auto fileSchema =
ROW({"test"}, {createType<TypeKind::MAP>({VARCHAR(), BIGINT()})});
auto rowReaderOpts = getReaderOpts(fileSchema);
auto scanSpec = makeScanSpec(fileSchema);
rowReaderOpts.setScanSpec(scanSpec);
auto rowReader = reader->createRowReader(rowReaderOpts);

auto expected = makeRowVector({vectorMaker_.mapVector<std::string, int64_t>(
{{{"0", 0},
{"1", 1},
{"2", 2},
{"3", 3},
{"4", 4},
{"5", 5},
{"6", 6},
{"7", 7}}})});

assertReadWithReaderAndExpected(fileSchema, *rowReader, expected, *leafPool_);
}

TEST_F(ParquetReaderTest, readSampleBigintRangeFilter) {
// Read sample.parquet with the int filter "a BETWEEN 16 AND 20".
FilterMap filters;
Expand Down

0 comments on commit 6af7c51

Please sign in to comment.