Skip to content

Commit

Permalink
refactor the file sources (#130)
Browse files Browse the repository at this point in the history
* refactor the file sources

refactor the file source

1. support reading tables with multi partition columns
2. support reading files with different partition values
3. support only reading partition columns from tables

* rename files

* clear unused codes

* fixed : in the case of a file is splitted into parts, data is loaded duplicatly

* support reading splitted parquet files

* throw an exception on empty columns to read

* fixed compile errors

* fixed a bug in building nullable columns from const columns

* try to fixed compile bugs

* fixed a bug in ceate partition columns

* delete BatchParquetFileSource

* fixed code style
  • Loading branch information
lgbo-ustc authored Sep 30, 2022
1 parent 3d1db97 commit 198ba4e
Show file tree
Hide file tree
Showing 21 changed files with 1,000 additions and 315 deletions.
3 changes: 3 additions & 0 deletions utils/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ include_directories(
)

add_subdirectory(Storages/ch_parquet)
add_subdirectory(Storages/SubstraitSource)

add_library(${LOCALENGINE_SHARED_LIB} SHARED
${builder_sources}
Expand All @@ -39,6 +40,7 @@ add_library(${LOCALENGINE_SHARED_LIB} SHARED
${external_sources}
${shuffle_sources}
${jni_sources}
${substrait_source}
${operator_sources}
local_engine_jni.cpp)

Expand All @@ -55,6 +57,7 @@ target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC
substrait
loggers
ch_parquet
substait_source
)

# if (USE_LIBCXX)
Expand Down
35 changes: 6 additions & 29 deletions utils/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/BatchParquetFileSource.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMergeTreeFactory.h>
Expand All @@ -42,6 +41,8 @@
#include <Common/MergeTreeTool.h>
#include <Common/StringUtils.h>
#include <google/protobuf/util/json_util.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>

#include "SerializedPlanParser.h"

namespace DB
Expand All @@ -53,7 +54,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int NO_SUCH_DATA_PART;
extern const int UNKNOWN_FUNCTION;
extern const int NOT_IMPLEMENTED;
}
}

Expand Down Expand Up @@ -200,35 +200,12 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrait::R
{
assert(rel.has_local_files());
assert(rel.has_base_schema());
auto files_info = std::make_shared<FilesInfo>();
for (const auto & item : rel.local_files().items())
{
files_info->files.push_back(item.uri_file());
}
auto header = parseNameStruct(rel.base_schema());
PartitionValues partition_values = StringUtils::parsePartitionTablePath(files_info->files[0]);
if (partition_values.size() > 1)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "doesn't support multiple level partition.");
}
ProcessorPtr partition_transform;
if (!partition_values.empty())
{
auto origin_header = header.cloneEmpty();
PartitionValue partition_value = partition_values[0];
header.erase(partition_value.first);
partition_transform
= std::make_shared<PartitionColumnFillingTransform>(header, origin_header, partition_value.first, partition_value.second);
}
auto query_plan = std::make_unique<QueryPlan>();
std::shared_ptr<IProcessor> source = std::make_shared<BatchParquetFileSource>(files_info, header, context);
auto source = std::make_shared<SubstraitFileSource>(context, header, rel.local_files());
auto source_pipe = Pipe(source);
if (partition_transform)
{
source_pipe.addTransform(partition_transform);
}
auto source_step = std::make_unique<ReadFromStorageStep>(std::move(source_pipe), "Parquet");
source_step->setStepDescription("Read Parquet");
auto source_step = std::make_unique<ReadFromStorageStep>(std::move(source_pipe), "substrait local files");
source_step->setStepDescription("read local files");
auto query_plan = std::make_unique<QueryPlan>();
query_plan->addStep(std::move(source_step));
return query_plan;
}
Expand Down
10 changes: 7 additions & 3 deletions utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ using namespace DB;
namespace local_engine
{
ArrowParquetBlockInputFormat::ArrowParquetBlockInputFormat(
DB::ReadBuffer & in_, const DB::Block & header, const DB::FormatSettings & formatSettings)
DB::ReadBuffer & in_, const DB::Block & header, const DB::FormatSettings & formatSettings, const std::vector<int> & row_group_indices_)
: OptimizedParquetBlockInputFormat(in_, header, formatSettings)
, row_group_indices(row_group_indices_)
{
}

Expand Down Expand Up @@ -74,8 +75,11 @@ DB::Chunk ArrowParquetBlockInputFormat::generate()
}
index += indexes_count;
}
auto row_group_range = boost::irange(0, file_reader->num_row_groups());
auto row_group_indices = std::vector(row_group_range.begin(), row_group_range.end());
if (row_group_indices.empty())
{
auto row_group_range = boost::irange(0, file_reader->num_row_groups());
row_group_indices = std::vector(row_group_range.begin(), row_group_range.end());
}
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw std::runtime_error{"Error while reading Parquet data: " + read_status.ToString()};
Expand Down
4 changes: 3 additions & 1 deletion utils/local-engine/Storages/ArrowParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ namespace local_engine
class ArrowParquetBlockInputFormat : public DB::OptimizedParquetBlockInputFormat
{
public:
ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings);
ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings, const std::vector<int> & row_group_indices_ = {});
//virtual ~ArrowParquetBlockInputFormat();

private:
DB::Chunk generate() override;

int64_t convert_time = 0;
int64_t non_convert_time = 0;
std::shared_ptr<arrow::RecordBatchReader> current_record_batch_reader;
std::vector<int> row_group_indices;
};

}
162 changes: 0 additions & 162 deletions utils/local-engine/Storages/BatchParquetFileSource.cpp

This file was deleted.

81 changes: 0 additions & 81 deletions utils/local-engine/Storages/BatchParquetFileSource.h

This file was deleted.

Loading

0 comments on commit 198ba4e

Please sign in to comment.