Skip to content

Commit

Permalink
Improve: reducing the open operation for the same file when reading p…
Browse files Browse the repository at this point in the history
…arquet files (#213)

* optimization for reading parquet files. reducing the open operation for the same file

* update
  • Loading branch information
lgbo-ustc authored Dec 12, 2022
1 parent 79b4286 commit 0a7ccba
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 29 deletions.
5 changes: 0 additions & 5 deletions utils/local-engine/Storages/SubstraitSource/FormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@ class FormatFile
struct InputFormat
{
public:
explicit InputFormat(DB::InputFormatPtr input_, std::unique_ptr<DB::ReadBuffer> read_buffer_)
: input(input_), read_buffer(std::move(read_buffer_))
{
}
DB::InputFormatPtr input;
private:
std::unique_ptr<DB::ReadBuffer> read_buffer;
};
using InputFormatPtr = std::shared_ptr<InputFormat>;
Expand Down
63 changes: 43 additions & 20 deletions utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <utility>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Storages/ArrowParquetBlockInputFormat.h>
#include <Storages/SubstraitSource/ParquetFormatFile.h>
Expand All @@ -24,11 +25,28 @@ ParquetFormatFile::ParquetFormatFile(DB::ContextPtr context_, const substrait::R

FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const DB::Block & header)
{
auto row_group_indices = collectRowGroupIndices();
auto read_buffer = read_buffer_builder->build(file_info);
auto res = std::make_shared<FormatFile::InputFormat>();
res->read_buffer = std::move(read_buffer_builder->build(file_info));
std::vector<int> row_group_indices;
std::vector<RowGroupInfomation> required_row_groups;
if (auto * seekable_in = dynamic_cast<DB::SeekableReadBufferWithSize *>(res->read_buffer.get()))
{
// reuse the read_buffer to avoid opening the file twice.
// especially,the cost of opening a hdfs file is large.
required_row_groups = collectRequiredRowGroups(seekable_in);
seekable_in->seek(0, SEEK_SET);
}
else
{
required_row_groups = collectRequiredRowGroups();
}
for (const auto & row_group : required_row_groups)
{
row_group_indices.emplace_back(row_group.index);
}
auto format_settings = DB::getFormatSettings(context);
auto input_format = std::make_shared<local_engine::ArrowParquetBlockInputFormat>(*read_buffer, header, format_settings, row_group_indices);
auto res = std::make_shared<FormatFile::InputFormat>(input_format, std::move(read_buffer));
auto input_format = std::make_shared<local_engine::ArrowParquetBlockInputFormat>(*(res->read_buffer), header, format_settings, row_group_indices);
res->input = input_format;
return res;
}

Expand All @@ -39,12 +57,11 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
if (total_rows)
return total_rows;
}
auto row_group_indices = collectRowGroupIndices();
auto rowgroups = collectRequiredRowGroups();
size_t rows = 0;
auto file_meta = reader->parquet_reader()->metadata();
for (auto i : row_group_indices)
for (const auto & rowgroup : rowgroups)
{
rows += file_meta->RowGroup(i)->num_rows();
rows += rowgroup.num_rows;
}
{
std::lock_guard lock(mutex);
Expand All @@ -53,28 +70,27 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
}
}

void ParquetFormatFile::prepareReader()
std::vector<RowGroupInfomation> ParquetFormatFile::collectRequiredRowGroups()
{
std::lock_guard lock(mutex);
if (reader)
return;
auto in = read_buffer_builder->build(file_info);
return collectRequiredRowGroups(in.get());
}

std::vector<RowGroupInfomation> ParquetFormatFile::collectRequiredRowGroups(DB::ReadBuffer * read_buffer)
{
std::unique_ptr<parquet::arrow::FileReader> reader;
DB::FormatSettings format_settings;
format_settings.seekable_read = true;
std::atomic<int> is_stopped{0};
auto status = parquet::arrow::OpenFile(
asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader);
asArrowFile(*read_buffer, format_settings, is_stopped), arrow::default_memory_pool(), &reader);
if (!status.ok())
{
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Open file({}) failed. {}", file_info.uri_file(), status.ToString());
}
}

std::vector<int> ParquetFormatFile::collectRowGroupIndices()
{
prepareReader();
auto file_meta = reader->parquet_reader()->metadata();
std::vector<int> indices;
std::vector<RowGroupInfomation> row_group_metadatas;
for (int i = 0, n = file_meta->num_row_groups(); i < n; ++i)
{
auto row_group_meta = file_meta->RowGroup(i);
Expand All @@ -85,9 +101,16 @@ std::vector<int> ParquetFormatFile::collectRowGroupIndices()
}
if (file_info.start() <= offset && offset < file_info.start() + file_info.length())
{
indices.push_back(i);
RowGroupInfomation info;
info.index = i;
info.num_rows = row_group_meta->num_rows();
info.start = row_group_meta->file_offset();
info.total_compressed_size = row_group_meta->total_compressed_size();
info.total_size = row_group_meta->total_byte_size();
row_group_metadatas.emplace_back(info);
}
}
return indices;
return row_group_metadatas;

}
}
16 changes: 12 additions & 4 deletions utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,18 @@
#include <memory>
#include <Storages/SubstraitSource/FormatFile.h>
#include <parquet/arrow/reader.h>
#include <IO/ReadBuffer.h>
#include <base/types.h>
namespace local_engine
{
struct RowGroupInfomation
{
UInt32 index = 0;
UInt64 start = 0;
UInt64 total_compressed_size = 0;
UInt64 total_size = 0;
UInt64 num_rows = 0;
};
class ParquetFormatFile : public FormatFile
{
public:
Expand All @@ -17,10 +27,8 @@ class ParquetFormatFile : public FormatFile
std::mutex mutex;
std::optional<size_t> total_rows;

std::unique_ptr<parquet::arrow::FileReader> reader;
void prepareReader();

std::vector<int> collectRowGroupIndices();
std::vector<RowGroupInfomation> collectRequiredRowGroups();
std::vector<RowGroupInfomation> collectRequiredRowGroups(DB::ReadBuffer * read_buffer);
};

}

0 comments on commit 0a7ccba

Please sign in to comment.