diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 07471ba15e24..92b767ef6a5c 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -94,6 +94,7 @@ class ReaderBase { uint32_t maxSchemaElementIdx, uint32_t maxRepeat, uint32_t maxDefine, + uint32_t parentSchemaIdx, uint32_t& schemaIdx, uint32_t& columnIdx) const; @@ -212,8 +213,11 @@ void ReaderBase::initializeSchema() { uint32_t schemaIdx = 0; uint32_t columnIdx = 0; uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1; + // Setting the parent schema index of the root("hive_schema") to be 0, which + // is the root itself. This is ok because it's never required to check the + // parent of the root in getParquetColumnInfo(). schemaWithId_ = getParquetColumnInfo( - maxSchemaElementIdx, maxRepeat, maxDefine, schemaIdx, columnIdx); + maxSchemaElementIdx, maxRepeat, maxDefine, 0, schemaIdx, columnIdx); schema_ = createRowType( schemaWithId_->getChildren(), isFileColumnNamesReadAsLowerCase()); } @@ -222,6 +226,7 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( uint32_t maxSchemaElementIdx, uint32_t maxRepeat, uint32_t maxDefine, + uint32_t parentSchemaIdx, uint32_t& schemaIdx, uint32_t& columnIdx) const { VELOX_CHECK(fileMetaData_ != nullptr); @@ -253,22 +258,60 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( std::vector> children; + auto curSchemaIdx = schemaIdx; for (int32_t i = 0; i < schemaElement.num_children; i++) { auto child = getParquetColumnInfo( - maxSchemaElementIdx, maxRepeat, maxDefine, ++schemaIdx, columnIdx); + maxSchemaElementIdx, + maxRepeat, + maxDefine, + curSchemaIdx, + ++schemaIdx, + columnIdx); children.push_back(child); } VELOX_CHECK(!children.empty()); if (schemaElement.__isset.converted_type) { switch (schemaElement.converted_type) { + case thrift::ConvertedType::MAP_KEY_VALUE: + // If the MAP_KEY_VALUE annotated group's parent is a MAP, it should + // be the repeated key_value group that directly contains the key and + // value children. + if (schema[parentSchemaIdx].converted_type == + thrift::ConvertedType::MAP) { + VELOX_CHECK_EQ( + schemaElement.repetition_type, + thrift::FieldRepetitionType::REPEATED); + VELOX_CHECK_EQ(children.size(), 2); + + auto childrenCopy = children; + return std::make_shared( + TypeFactory::create( + children[0]->type(), children[1]->type()), + std::move(childrenCopy), + curSchemaIdx, // TODO: there are holes in the ids + maxSchemaElementIdx, + ParquetTypeWithId::kNonLeaf, // columnIdx, + std::move(name), + std::nullopt, + std::nullopt, + maxRepeat, + maxDefine); + } + + // For backward-compatibility, a group annotated with MAP_KEY_VALUE + // that is not contained by a MAP-annotated group should be handled as + // a MAP-annotated group. + FOLLY_FALLTHROUGH; + case thrift::ConvertedType::LIST: case thrift::ConvertedType::MAP: { - auto element = children.at(0)->getChildren(); VELOX_CHECK_EQ(children.size(), 1); + const auto& child = children[0]; + auto grandChildren = child->getChildren(); return std::make_shared( - children[0]->type(), - std::move(element), + child->type(), + std::move(grandChildren), curSchemaIdx, // TODO: there are holes in the ids maxSchemaElementIdx, ParquetTypeWithId::kNonLeaf, // columnIdx, @@ -278,30 +321,12 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( maxRepeat + 1, maxDefine); } - case thrift::ConvertedType::MAP_KEY_VALUE: { - // child of MAP - VELOX_CHECK_EQ( - schemaElement.repetition_type, - thrift::FieldRepetitionType::REPEATED); - assert(children.size() == 2); - auto childrenCopy = children; - return std::make_shared( - TypeFactory::create( - children[0]->type(), children[1]->type()), - std::move(childrenCopy), - curSchemaIdx, // TODO: there are holes in the ids - maxSchemaElementIdx, - ParquetTypeWithId::kNonLeaf, // columnIdx, - std::move(name), - std::nullopt, - std::nullopt, - maxRepeat, - maxDefine); - } + default: - VELOX_UNSUPPORTED( - "Unsupported SchemaElement type: {}", - schemaElement.converted_type); + VELOX_UNREACHABLE( + "Invalid SchemaElement converted_type: {}, name: {}", + schemaElement.converted_type, + schemaElement.name); } } else { if (schemaElement.repetition_type == diff --git a/velox/dwio/parquet/tests/examples/map_key_value.parquet b/velox/dwio/parquet/tests/examples/map_key_value.parquet new file mode 100644 index 000000000000..ae0755cf0b25 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/map_key_value.parquet differ diff --git a/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp b/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp index e9fc44b18cc0..f247b6540223 100644 --- a/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp @@ -16,6 +16,7 @@ #include "velox/dwio/parquet/tests/ParquetTestBase.h" #include "velox/expression/ExprToSubfieldFilter.h" +#include "velox/vector/tests/utils/VectorMaker.h" using namespace facebook::velox; using namespace facebook::velox::common; @@ -510,6 +511,58 @@ TEST_F(ParquetReaderTest, parseIntDecimal) { } } +TEST_F(ParquetReaderTest, parseMapKeyValueAsMap) { + // map_key_value.parquet holds a single map column (key: VARCHAR, b: BIGINT) + // and 1 row that contains 8 map entries. It is with older version of Parquet + // and uses MAP_KEY_VALUE instead of MAP as the map SchemaElement + // converted_type. It has 5 SchemaElements in the schema, in the format of + // schemaIdx: name (): + // + // 0: REQUIRED BOOLEAN hive_schema (UTF8) + // 1: OPTIONAL BOOLEAN test (MAP_KEY_VALUE) + // 2: REPEATED BOOLEAN map (UTF8) + // 3: REQUIRED BYTE_ARRAY key (UTF8) + // 4: OPTIONAL INT64 value (UTF8) + + const std::string sample(getExampleFilePath("map_key_value.parquet")); + + facebook::velox::dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReader(sample, readerOptions); + EXPECT_EQ(reader->numberOfRows(), 1ULL); + + auto rowType = reader->typeWithId(); + EXPECT_EQ(rowType->type()->kind(), TypeKind::ROW); + EXPECT_EQ(rowType->size(), 1ULL); + + auto mapColumnType = rowType->childAt(0); + EXPECT_EQ(mapColumnType->type()->kind(), TypeKind::MAP); + + auto mapKeyType = mapColumnType->childAt(0); + EXPECT_EQ(mapKeyType->type()->kind(), TypeKind::VARCHAR); + + auto mapValueType = mapColumnType->childAt(1); + EXPECT_EQ(mapValueType->type()->kind(), TypeKind::BIGINT); + + auto fileSchema = + ROW({"test"}, {createType({VARCHAR(), BIGINT()})}); + auto rowReaderOpts = getReaderOpts(fileSchema); + auto scanSpec = makeScanSpec(fileSchema); + rowReaderOpts.setScanSpec(scanSpec); + auto rowReader = reader->createRowReader(rowReaderOpts); + + auto expected = makeRowVector({vectorMaker_.mapVector( + {{{"0", 0}, + {"1", 1}, + {"2", 2}, + {"3", 3}, + {"4", 4}, + {"5", 5}, + {"6", 6}, + {"7", 7}}})}); + + assertReadWithReaderAndExpected(fileSchema, *rowReader, expected, *leafPool_); +} + TEST_F(ParquetReaderTest, readSampleBigintRangeFilter) { // Read sample.parquet with the int filter "a BETWEEN 16 AND 20". FilterMap filters;