Skip to content

Commit

Permalink
Solve conflict symbols of DB::ParquetBlockInputFormat and add some be…
Browse files Browse the repository at this point in the history
…nchmark of optimized parquet reading (#121)

* add benchmark for optimized parquet format

* commit again

* commit again
  • Loading branch information
taiyang-li authored Sep 21, 2022
1 parent 0889631 commit c5ba2bf
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 47 deletions.
12 changes: 2 additions & 10 deletions utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
#include <boost/range/irange.hpp>
#include <DataTypes/NestedUtils.h>

#include "ch_parquet/ArrowColumnToCHColumn.h"
#include "ch_parquet/OptimizedArrowColumnToCHColumn.h"

using namespace DB;

namespace local_engine
{
ArrowParquetBlockInputFormat::ArrowParquetBlockInputFormat(
DB::ReadBuffer & in_, const DB::Block & header, const DB::FormatSettings & formatSettings)
: ParquetBlockInputFormat(in_, header, formatSettings)
: OptimizedParquetBlockInputFormat(in_, header, formatSettings)
{
}

Expand Down Expand Up @@ -111,13 +111,5 @@ DB::Chunk ArrowParquetBlockInputFormat::generate()
block_missing_values.setBit(column_idx, row_idx);
return res;
}
//ArrowParquetBlockInputFormat::~ArrowParquetBlockInputFormat()
//{
// std::cerr<<"convert time: " << convert_time / 1000000.0 <<" ms"<<std::endl;
// std::cerr<<"non-convert time: " << non_convert_time / 1000000.0 <<" ms"<<std::endl;
// std::cerr<<"convert/non-convert " << 1.0 * convert_time / non_convert_time <<" "<<std::endl;
// std::cerr<<"real convert " << 1.0 * arrow_column_to_ch_column->real_convert/ 1000000.0 <<" ms"<<std::endl;
// std::cerr<<"cast time " << 1.0 * arrow_column_to_ch_column->cast_time/ 1000000.0 <<" ms"<<std::endl;
//}

}
7 changes: 3 additions & 4 deletions utils/local-engine/Storages/ArrowParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <Common/ChunkBuffer.h>
#include "ch_parquet/ArrowColumnToCHColumn.h"
#include "ch_parquet/ParquetBlockInputFormat.h"
#include "ch_parquet/OptimizedParquetBlockInputFormat.h"
#include "ch_parquet/OptimizedArrowColumnToCHColumn.h"
#include "ch_parquet/arrow/reader.h"

namespace arrow
Expand All @@ -13,11 +13,10 @@ class Table;

namespace local_engine
{
class ArrowParquetBlockInputFormat : public DB::ParquetBlockInputFormat
class ArrowParquetBlockInputFormat : public DB::OptimizedParquetBlockInputFormat
{
public:
ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings);
//virtual ~ArrowParquetBlockInputFormat();

private:
DB::Chunk generate() override;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "ArrowColumnToCHColumn.h"
#include "OptimizedArrowColumnToCHColumn.h"

#if USE_ARROW || USE_ORC || USE_PARQUET

Expand Down Expand Up @@ -498,7 +498,7 @@ static void checkStatus(const arrow::Status & status, const String & column_name
throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()};
}

Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(const arrow::Schema & schema, const std::string & format_name)
Block OptimizedArrowColumnToCHColumn::arrowSchemaToCHHeader(const arrow::Schema & schema, const std::string & format_name)
{
ColumnsWithTypeAndName sample_columns;
for (const auto & field : schema.fields())
Expand All @@ -523,13 +523,13 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(const arrow::Schema & schema,
return Block(std::move(sample_columns));
}

ArrowColumnToCHColumn::ArrowColumnToCHColumn(
OptimizedArrowColumnToCHColumn::OptimizedArrowColumnToCHColumn(
const Block & header_, const std::string & format_name_, bool import_nested_, bool allow_missing_columns_)
: header(header_), format_name(format_name_), import_nested(import_nested_), allow_missing_columns(allow_missing_columns_)
{
}

void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
void OptimizedArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
NameToColumnPtr name_to_column_ptr;
for (const auto & column_name : table->ColumnNames())
Expand All @@ -546,7 +546,7 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
real_convert += sw.elapsedNanoseconds();
}

void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
void OptimizedArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
{
if (unlikely(name_to_column_ptr.empty()))
throw Exception(ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS, "Columns is empty");
Expand Down Expand Up @@ -620,7 +620,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
res.setColumns(columns_list, num_rows);
}

std::vector<size_t> ArrowColumnToCHColumn::getMissingColumns(const arrow::Schema & schema) const
std::vector<size_t> OptimizedArrowColumnToCHColumn::getMissingColumns(const arrow::Schema & schema) const
{
std::vector<size_t> missing_columns;
auto block_from_arrow = arrowSchemaToCHHeader(schema, format_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ namespace DB
class Block;
class Chunk;

class ArrowColumnToCHColumn
class OptimizedArrowColumnToCHColumn
{
public:
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;

ArrowColumnToCHColumn(
OptimizedArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
bool import_nested_,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "ParquetBlockInputFormat.h"
#include "OptimizedParquetBlockInputFormat.h"
#include <boost/algorithm/string/case_conv.hpp>

#if USE_PARQUET
Expand All @@ -12,7 +12,7 @@
#include "Storages/ch_parquet/arrow/reader.h"
#include <parquet/file_reader.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include "ArrowColumnToCHColumn.h"
#include "OptimizedArrowColumnToCHColumn.h"
#include <DataTypes/NestedUtils.h>

namespace DB
Expand All @@ -31,12 +31,12 @@ namespace ErrorCodes
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)

ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
OptimizedParquetBlockInputFormat::OptimizedParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
{
}

Chunk ParquetBlockInputFormat::generate()
Chunk OptimizedParquetBlockInputFormat::generate()
{
Chunk res;
block_missing_values.clear();
Expand Down Expand Up @@ -72,7 +72,7 @@ Chunk ParquetBlockInputFormat::generate()
return res;
}

void ParquetBlockInputFormat::resetParser()
void OptimizedParquetBlockInputFormat::resetParser()
{
IInputFormat::resetParser();

Expand All @@ -83,7 +83,7 @@ void ParquetBlockInputFormat::resetParser()
block_missing_values.clear();
}

const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
const BlockMissingValues & OptimizedParquetBlockInputFormat::getMissingValues() const
{
return block_missing_values;
}
Expand Down Expand Up @@ -139,7 +139,7 @@ static void getFileReaderAndSchema(
}
}

void ParquetBlockInputFormat::prepareReader()
void OptimizedParquetBlockInputFormat::prepareReader()
{
std::shared_ptr<arrow::Schema> schema;
getFileReaderAndSchema(*in, file_reader, schema, format_settings, is_stopped);
Expand All @@ -149,7 +149,7 @@ void ParquetBlockInputFormat::prepareReader()
row_group_total = file_reader->num_row_groups();
row_group_current = 0;

arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(getPort().getHeader(), "Parquet", format_settings.parquet.import_nested, format_settings.parquet.allow_missing_columns);
arrow_column_to_ch_column = std::make_unique<OptimizedArrowColumnToCHColumn>(getPort().getHeader(), "Parquet", format_settings.parquet.import_nested, format_settings.parquet.allow_missing_columns);
missing_columns = arrow_column_to_ch_column->getMissingColumns(*schema);

std::unordered_set<String> nested_table_names;
Expand All @@ -176,17 +176,17 @@ void ParquetBlockInputFormat::prepareReader()
}
}

ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) : ISchemaReader(in_), format_settings(format_settings_)
OptimizedParquetSchemaReader::OptimizedParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) : ISchemaReader(in_), format_settings(format_settings_)
{
}

NamesAndTypesList ParquetSchemaReader::readSchema()
NamesAndTypesList OptimizedParquetSchemaReader::readSchema()
{
std::unique_ptr<ch_parquet::arrow::FileReader> file_reader;
std::shared_ptr<arrow::Schema> schema;
std::atomic<int> is_stopped = 0;
getFileReaderAndSchema(in, file_reader, schema, format_settings, is_stopped);
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(*schema, "Parquet");
auto header = OptimizedArrowColumnToCHColumn::arrowSchemaToCHHeader(*schema, "Parquet");
return header.getNamesAndTypesList();
}

Expand All @@ -199,18 +199,18 @@ void registerInputFormatParquet(FormatFactory & factory)
const RowInputFormatParams &,
const FormatSettings & settings)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample, settings);
return std::make_shared<OptimizedParquetBlockInputFormat>(buf, sample, settings);
});
factory.markFormatAsColumnOriented("Parquet");
}

void registerParquetSchemaReader(FormatFactory & factory)
void registerOptimizedParquetSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader(
"Parquet",
[](ReadBuffer & buf, const FormatSettings & settings, ContextPtr)
{
return std::make_shared<ParquetSchemaReader>(buf, settings);
return std::make_shared<OptimizedParquetSchemaReader>(buf, settings);
}
);
}
Expand All @@ -226,7 +226,7 @@ void registerInputFormatParquet(FormatFactory &)
{
}

void registerParquetSchemaReader(FormatFactory &) {}
void registerOptimizedParquetSchemaReader(FormatFactory &) {}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ namespace arrow { class Buffer; }
namespace DB
{

class ArrowColumnToCHColumn;
class OptimizedArrowColumnToCHColumn;

class ParquetBlockInputFormat : public IInputFormat
class OptimizedParquetBlockInputFormat : public IInputFormat
{
public:
ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
OptimizedParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);

void resetParser() override;

String getName() const override { return "ParquetBlockInputFormat"; }
String getName() const override { return "OptimizedParquetBlockInputFormat"; }

const BlockMissingValues & getMissingValues() const override;

Expand All @@ -42,7 +42,7 @@ class ParquetBlockInputFormat : public IInputFormat
// indices of columns to read from Parquet file
std::vector<int> column_indices;
std::vector<String> column_names;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
std::unique_ptr<OptimizedArrowColumnToCHColumn> arrow_column_to_ch_column;
int row_group_current = 0;
std::vector<size_t> missing_columns;
BlockMissingValues block_missing_values;
Expand All @@ -51,10 +51,10 @@ class ParquetBlockInputFormat : public IInputFormat
std::atomic<int> is_stopped{0};
};

class ParquetSchemaReader : public ISchemaReader
class OptimizedParquetSchemaReader : public ISchemaReader
{
public:
ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
OptimizedParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);

NamesAndTypesList readSchema() override;

Expand Down
4 changes: 2 additions & 2 deletions utils/local-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ grep_gtest_sources("${ClickHouse_SOURCE_DIR}/utils/local_engine/tests" local_eng

add_executable(unit_tests_local_engine ${local_engine_gtest_sources} )

add_executable(benchmark_local_engine benchmark_local_engine.cpp)
add_executable(benchmark_local_engine benchmark_local_engine.cpp benchmark_parquet_read.cpp)

target_compile_options(unit_tests_local_engine PRIVATE
-Wno-error
Expand All @@ -47,7 +47,7 @@ target_compile_options(benchmark PUBLIC
target_include_directories(unit_tests_local_engine PRIVATE
${GTEST_INCLUDE_DIRS}/include
)
include_directories(benchmark_local_engine SYSTEM PUBLIC ${FETCH_CONTENT_SOURCE_DIR_GOOGLEBENCHMARK}/include)
include_directories(benchmark_local_engine SYSTEM PUBLIC ${FETCH_CONTENT_SOURCE_DIR_GOOGLEBENCHMARK}/include ${ClickHouse_SOURCE_DIR}/utils/local_engine)

target_link_libraries(unit_tests_local_engine PRIVATE ${LOCALENGINE_SHARED_LIB} _gtest_all)
target_link_libraries(benchmark_local_engine PRIVATE ${LOCALENGINE_SHARED_LIB} benchmark::benchmark)
1 change: 0 additions & 1 deletion utils/local-engine/tests/benchmark_local_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ DB::ContextMutablePtr global_context;
files->files = {
"file:///home/hongbin/code/gluten/jvm/src/test/resources/tpch-data/lineitem/"
"part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet",

};
auto builder = std::make_unique<QueryPipelineBuilder>();
builder->init(Pipe(std::make_shared<BatchParquetFileSource>(files, header, SerializedPlanParser::global_context)));
Expand Down
Loading

0 comments on commit c5ba2bf

Please sign in to comment.