From f27b5c70b21e5fc8b9b71fddd953cddbc0350a88 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Wed, 5 Jun 2024 14:21:46 +0800 Subject: [PATCH] [GLUTEN-3582][CH] Using ParquetBlockInputFormat instead of VectorizedParquetBlockInputFormat for complex type --- .../GlutenClickHouseHiveTableSuite.scala | 1 + .../SubstraitSource/ParquetFormatFile.cpp | 20 +++++++++++-- .../SubstraitSource/ParquetFormatFile.h | 4 ++- .../local-engine/tests/gtest_parquet_read.cpp | 29 +++++++++++++++++++ 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala index 3c993b622018..9b52f6a8cb53 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala @@ -111,6 +111,7 @@ class GlutenClickHouseHiveTableSuite getClass.getResource("/").getPath + "tests-working-home/spark-warehouse") .set("spark.hive.exec.dynamic.partition.mode", "nonstrict") .set("spark.gluten.supported.hive.udfs", "my_add") + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") .setMaster("local[*]") } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index 2e0f000456d8..f557df5b27bf 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -22,7 +22,7 @@ #include #include -#include +#include #include #include #include @@ -46,12 +46,13 @@ extern const int UNKNOWN_TYPE; namespace local_engine { + ParquetFormatFile::ParquetFormatFile( const DB::ContextPtr & context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, const ReadBufferBuilderPtr & read_buffer_builder_, bool use_local_format_) - : FormatFile(context_, file_info_, read_buffer_builder_), use_local_format(use_local_format_) + : FormatFile(context_, file_info_, read_buffer_builder_), use_pageindex_reader(use_local_format_) { } @@ -85,7 +86,7 @@ FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const DB::Block std::ranges::set_difference(total_row_group_indices, required_row_group_indices, std::back_inserter(skip_row_group_indices)); format_settings.parquet.skip_row_groups = std::unordered_set(skip_row_group_indices.begin(), skip_row_group_indices.end()); - if (use_local_format) + if (use_pageindex_reader && pageindex_reader_support(header)) res->input = std::make_shared(*(res->read_buffer), header, format_settings); else res->input = std::make_shared(*(res->read_buffer), header, format_settings, 1, 8192); @@ -112,6 +113,19 @@ std::optional ParquetFormatFile::getTotalRows() return total_rows; } } +bool ParquetFormatFile::pageindex_reader_support(const DB::Block & header) +{ + const auto result = std::ranges::find_if( + header, + [](DB::ColumnWithTypeAndName const & col) + { + const DB::DataTypePtr type_not_nullable = DB::removeNullable(col.type); + const DB::WhichDataType which(type_not_nullable); + return DB::isArray(which) || DB::isMap(which) || DB::isTuple(which); + }); + + return result == header.end(); +} std::vector ParquetFormatFile::collectRequiredRowGroups(int & total_row_groups) const { diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h index 045f0049d674..ba7f28883e65 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h @@ -55,8 +55,10 @@ class ParquetFormatFile : public FormatFile String getFileFormat() const override { return "Parquet"; } + static bool pageindex_reader_support(const DB::Block & header); + private: - bool use_local_format; + bool use_pageindex_reader; std::mutex mutex; std::optional total_rows; diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp index 94f28763e679..9623ffa98d28 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp @@ -15,6 +15,9 @@ * limitations under the License. */ +#include + + #include "config.h" #if USE_PARQUET @@ -139,6 +142,32 @@ TEST(ParquetRead, ReadSchema) readSchema("alltypes/alltypes_null.parquet"); } +TEST(ParquetRead, VerifyPageindexReaderSupport) +{ + EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("alltypes/alltypes_notnull.parquet"))))); + EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("alltypes/alltypes_null.parquet"))))); + + + EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("array.parquet"))))); + EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("date.parquet"))))); + EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("datetime64.parquet"))))); + EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("decimal.parquet"))))); + EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("iris.parquet"))))); + EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("map.parquet"))))); + EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("sample.parquet"))))); + EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support( + toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("struct.parquet"))))); +} + TEST(ParquetRead, ReadDataNotNull) { const std::map fields{