From 0a7ccba4e6ecd7b86c61f603bff4d2bc28bd1148 Mon Sep 17 00:00:00 2001 From: lgbo Date: Mon, 12 Dec 2022 17:23:39 +0800 Subject: [PATCH] Improve: reducing the open operation for the same file when reading parquet files (#213) * optimization for reading parquet files. reducing the open operation for the same file * update --- .../Storages/SubstraitSource/FormatFile.h | 5 -- .../SubstraitSource/ParquetFormatFile.cpp | 63 +++++++++++++------ .../SubstraitSource/ParquetFormatFile.h | 16 +++-- 3 files changed, 55 insertions(+), 29 deletions(-) diff --git a/utils/local-engine/Storages/SubstraitSource/FormatFile.h b/utils/local-engine/Storages/SubstraitSource/FormatFile.h index 59d8d720f2a0..351ba8773287 100644 --- a/utils/local-engine/Storages/SubstraitSource/FormatFile.h +++ b/utils/local-engine/Storages/SubstraitSource/FormatFile.h @@ -18,12 +18,7 @@ class FormatFile struct InputFormat { public: - explicit InputFormat(DB::InputFormatPtr input_, std::unique_ptr read_buffer_) - : input(input_), read_buffer(std::move(read_buffer_)) - { - } DB::InputFormatPtr input; - private: std::unique_ptr read_buffer; }; using InputFormatPtr = std::shared_ptr; diff --git a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index 6b9f4eced2e1..b03ae71b5b3b 100644 --- a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -24,11 +25,28 @@ ParquetFormatFile::ParquetFormatFile(DB::ContextPtr context_, const substrait::R FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const DB::Block & header) { - auto row_group_indices = collectRowGroupIndices(); - auto read_buffer = read_buffer_builder->build(file_info); + auto res = std::make_shared(); + res->read_buffer = std::move(read_buffer_builder->build(file_info)); + std::vector row_group_indices; + std::vector required_row_groups; + if (auto * seekable_in = dynamic_cast(res->read_buffer.get())) + { + // reuse the read_buffer to avoid opening the file twice. + // especially,the cost of opening a hdfs file is large. + required_row_groups = collectRequiredRowGroups(seekable_in); + seekable_in->seek(0, SEEK_SET); + } + else + { + required_row_groups = collectRequiredRowGroups(); + } + for (const auto & row_group : required_row_groups) + { + row_group_indices.emplace_back(row_group.index); + } auto format_settings = DB::getFormatSettings(context); - auto input_format = std::make_shared(*read_buffer, header, format_settings, row_group_indices); - auto res = std::make_shared(input_format, std::move(read_buffer)); + auto input_format = std::make_shared(*(res->read_buffer), header, format_settings, row_group_indices); + res->input = input_format; return res; } @@ -39,12 +57,11 @@ std::optional ParquetFormatFile::getTotalRows() if (total_rows) return total_rows; } - auto row_group_indices = collectRowGroupIndices(); + auto rowgroups = collectRequiredRowGroups(); size_t rows = 0; - auto file_meta = reader->parquet_reader()->metadata(); - for (auto i : row_group_indices) + for (const auto & rowgroup : rowgroups) { - rows += file_meta->RowGroup(i)->num_rows(); + rows += rowgroup.num_rows; } { std::lock_guard lock(mutex); @@ -53,28 +70,27 @@ std::optional ParquetFormatFile::getTotalRows() } } -void ParquetFormatFile::prepareReader() +std::vector ParquetFormatFile::collectRequiredRowGroups() { - std::lock_guard lock(mutex); - if (reader) - return; auto in = read_buffer_builder->build(file_info); + return collectRequiredRowGroups(in.get()); +} + +std::vector ParquetFormatFile::collectRequiredRowGroups(DB::ReadBuffer * read_buffer) +{ + std::unique_ptr reader; DB::FormatSettings format_settings; format_settings.seekable_read = true; std::atomic is_stopped{0}; auto status = parquet::arrow::OpenFile( - asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader); + asArrowFile(*read_buffer, format_settings, is_stopped), arrow::default_memory_pool(), &reader); if (!status.ok()) { throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Open file({}) failed. {}", file_info.uri_file(), status.ToString()); } -} -std::vector ParquetFormatFile::collectRowGroupIndices() -{ - prepareReader(); auto file_meta = reader->parquet_reader()->metadata(); - std::vector indices; + std::vector row_group_metadatas; for (int i = 0, n = file_meta->num_row_groups(); i < n; ++i) { auto row_group_meta = file_meta->RowGroup(i); @@ -85,9 +101,16 @@ std::vector ParquetFormatFile::collectRowGroupIndices() } if (file_info.start() <= offset && offset < file_info.start() + file_info.length()) { - indices.push_back(i); + RowGroupInfomation info; + info.index = i; + info.num_rows = row_group_meta->num_rows(); + info.start = row_group_meta->file_offset(); + info.total_compressed_size = row_group_meta->total_compressed_size(); + info.total_size = row_group_meta->total_byte_size(); + row_group_metadatas.emplace_back(info); } } - return indices; + return row_group_metadatas; + } } diff --git a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h index 839ec63005b1..d26e5a28ac39 100644 --- a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h +++ b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h @@ -2,8 +2,18 @@ #include #include #include +#include +#include namespace local_engine { +struct RowGroupInfomation +{ + UInt32 index = 0; + UInt64 start = 0; + UInt64 total_compressed_size = 0; + UInt64 total_size = 0; + UInt64 num_rows = 0; +}; class ParquetFormatFile : public FormatFile { public: @@ -17,10 +27,8 @@ class ParquetFormatFile : public FormatFile std::mutex mutex; std::optional total_rows; - std::unique_ptr reader; - void prepareReader(); - - std::vector collectRowGroupIndices(); + std::vector collectRequiredRowGroups(); + std::vector collectRequiredRowGroups(DB::ReadBuffer * read_buffer); }; }