From 198ba4e91a179873247313cf72d3fabf5c2ba42f Mon Sep 17 00:00:00 2001 From: lgbo Date: Fri, 30 Sep 2022 10:21:40 +0800 Subject: [PATCH] refactor the file sources (#130) * refactor the file sources refactor the file source 1. support reading tables with multi partition columns 2. support reading files with different partition values 3. support only reading partition columns from tables * rename files * clear unused codes * fixed : in the case of a file is splitted into parts, data is loaded duplicatly * support reading splitted parquet files * throw an exception on empty columns to read * fixed compile errors * fixed a bug in building nullable columns from const columns * try to fixed compile bugs * fixed a bug in ceate partition columns * delete BatchParquetFileSource * fixed code style --- utils/local-engine/CMakeLists.txt | 3 + .../Parser/SerializedPlanParser.cpp | 35 +-- .../Storages/ArrowParquetBlockInputFormat.cpp | 10 +- .../Storages/ArrowParquetBlockInputFormat.h | 4 +- .../Storages/BatchParquetFileSource.cpp | 162 ---------- .../Storages/BatchParquetFileSource.h | 81 ----- .../Storages/SubstraitSource/CMakeLists.txt | 45 +++ .../Storages/SubstraitSource/FormatFile.cpp | 51 +++ .../Storages/SubstraitSource/FormatFile.h | 71 +++++ .../SubstraitSource/ParquetFormatFile.cpp | 91 ++++++ .../SubstraitSource/ParquetFormatFile.h | 26 ++ .../SubstraitSource/ReadBufferBuilder.cpp | 193 ++++++++++++ .../SubstraitSource/ReadBufferBuilder.h | 38 +++ .../SubstraitSource/SubstraitFileSource.cpp | 294 ++++++++++++++++++ .../SubstraitSource/SubstraitFileSource.h | 96 ++++++ utils/local-engine/local_engine_jni.cpp | 2 + .../tests/benchmark_local_engine.cpp | 18 +- .../tests/benchmark_parquet_read.cpp | 25 +- utils/local-engine/tests/gtest_ch_join.cpp | 3 +- .../local-engine/tests/gtest_ch_storages.cpp | 53 ++-- .../local-engine/tests/gtest_local_engine.cpp | 14 +- 21 files changed, 1000 insertions(+), 315 deletions(-) delete mode 100644 utils/local-engine/Storages/BatchParquetFileSource.cpp delete mode 100644 utils/local-engine/Storages/BatchParquetFileSource.h create mode 100644 utils/local-engine/Storages/SubstraitSource/CMakeLists.txt create mode 100644 utils/local-engine/Storages/SubstraitSource/FormatFile.cpp create mode 100644 utils/local-engine/Storages/SubstraitSource/FormatFile.h create mode 100644 utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp create mode 100644 utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h create mode 100644 utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp create mode 100644 utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h create mode 100644 utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp create mode 100644 utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.h diff --git a/utils/local-engine/CMakeLists.txt b/utils/local-engine/CMakeLists.txt index 28ae0902247c..f4bbe99c835f 100644 --- a/utils/local-engine/CMakeLists.txt +++ b/utils/local-engine/CMakeLists.txt @@ -30,6 +30,7 @@ include_directories( ) add_subdirectory(Storages/ch_parquet) +add_subdirectory(Storages/SubstraitSource) add_library(${LOCALENGINE_SHARED_LIB} SHARED ${builder_sources} @@ -39,6 +40,7 @@ add_library(${LOCALENGINE_SHARED_LIB} SHARED ${external_sources} ${shuffle_sources} ${jni_sources} + ${substrait_source} ${operator_sources} local_engine_jni.cpp) @@ -55,6 +57,7 @@ target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC substrait loggers ch_parquet + substait_source ) # if (USE_LIBCXX) diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index 558bcc55fb2f..7d32c01d2a89 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include @@ -42,6 +41,8 @@ #include #include #include +#include + #include "SerializedPlanParser.h" namespace DB @@ -53,7 +54,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int NO_SUCH_DATA_PART; extern const int UNKNOWN_FUNCTION; - extern const int NOT_IMPLEMENTED; } } @@ -200,35 +200,12 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrait::R { assert(rel.has_local_files()); assert(rel.has_base_schema()); - auto files_info = std::make_shared(); - for (const auto & item : rel.local_files().items()) - { - files_info->files.push_back(item.uri_file()); - } auto header = parseNameStruct(rel.base_schema()); - PartitionValues partition_values = StringUtils::parsePartitionTablePath(files_info->files[0]); - if (partition_values.size() > 1) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "doesn't support multiple level partition."); - } - ProcessorPtr partition_transform; - if (!partition_values.empty()) - { - auto origin_header = header.cloneEmpty(); - PartitionValue partition_value = partition_values[0]; - header.erase(partition_value.first); - partition_transform - = std::make_shared(header, origin_header, partition_value.first, partition_value.second); - } - auto query_plan = std::make_unique(); - std::shared_ptr source = std::make_shared(files_info, header, context); + auto source = std::make_shared(context, header, rel.local_files()); auto source_pipe = Pipe(source); - if (partition_transform) - { - source_pipe.addTransform(partition_transform); - } - auto source_step = std::make_unique(std::move(source_pipe), "Parquet"); - source_step->setStepDescription("Read Parquet"); + auto source_step = std::make_unique(std::move(source_pipe), "substrait local files"); + source_step->setStepDescription("read local files"); + auto query_plan = std::make_unique(); query_plan->addStep(std::move(source_step)); return query_plan; } diff --git a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp index a3f3bb7b9599..1b314205a22e 100644 --- a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp +++ b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp @@ -13,8 +13,9 @@ using namespace DB; namespace local_engine { ArrowParquetBlockInputFormat::ArrowParquetBlockInputFormat( - DB::ReadBuffer & in_, const DB::Block & header, const DB::FormatSettings & formatSettings) + DB::ReadBuffer & in_, const DB::Block & header, const DB::FormatSettings & formatSettings, const std::vector & row_group_indices_) : OptimizedParquetBlockInputFormat(in_, header, formatSettings) + , row_group_indices(row_group_indices_) { } @@ -74,8 +75,11 @@ DB::Chunk ArrowParquetBlockInputFormat::generate() } index += indexes_count; } - auto row_group_range = boost::irange(0, file_reader->num_row_groups()); - auto row_group_indices = std::vector(row_group_range.begin(), row_group_range.end()); + if (row_group_indices.empty()) + { + auto row_group_range = boost::irange(0, file_reader->num_row_groups()); + row_group_indices = std::vector(row_group_range.begin(), row_group_range.end()); + } auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, ¤t_record_batch_reader); if (!read_status.ok()) throw std::runtime_error{"Error while reading Parquet data: " + read_status.ToString()}; diff --git a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h index 4c6809b8b300..e01741f5be8b 100644 --- a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h +++ b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h @@ -16,7 +16,8 @@ namespace local_engine class ArrowParquetBlockInputFormat : public DB::OptimizedParquetBlockInputFormat { public: - ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings); + ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings, const std::vector & row_group_indices_ = {}); + //virtual ~ArrowParquetBlockInputFormat(); private: DB::Chunk generate() override; @@ -24,6 +25,7 @@ class ArrowParquetBlockInputFormat : public DB::OptimizedParquetBlockInputFormat int64_t convert_time = 0; int64_t non_convert_time = 0; std::shared_ptr current_record_batch_reader; + std::vector row_group_indices; }; } diff --git a/utils/local-engine/Storages/BatchParquetFileSource.cpp b/utils/local-engine/Storages/BatchParquetFileSource.cpp deleted file mode 100644 index 9b229631d88b..000000000000 --- a/utils/local-engine/Storages/BatchParquetFileSource.cpp +++ /dev/null @@ -1,162 +0,0 @@ -#include "BatchParquetFileSource.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace DB; - -namespace local_engine -{ -#if USE_AWS_S3 - -std::shared_ptr -getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context) -{ - S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration( - config.getString(config_prefix + ".region", ""), - context->getRemoteHostFilter(), - context->getGlobalContext()->getSettingsRef().s3_max_redirects); - - S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint"))); - - client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000); - client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); - client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); - client_configuration.endpointOverride = uri.endpoint; - - client_configuration.retryStrategy - = std::make_shared(config.getUInt(config_prefix + ".retry_attempts", 10)); - - return S3::ClientFactory::instance().create( - client_configuration, - uri.is_virtual_hosted_style, - config.getString(config_prefix + ".access_key_id", ""), - config.getString(config_prefix + ".secret_access_key", ""), - config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""), - {}, - config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)), - config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false))); -} -#endif - -BatchParquetFileSource::BatchParquetFileSource(FilesInfoPtr files_info_, const DB::Block & header_, const DB::ContextPtr & context_) - : SourceWithProgress(header_), files_info(files_info_), header(header_), context(context_) -{ -} -DB::Chunk BatchParquetFileSource::generate() -{ - while (!finished_generate) - { - /// Open file lazily on first read. This is needed to avoid too many open files from different streams. - if (!reader) - { - auto current_file = files_info->next_file_to_read.fetch_add(1); - if (current_file >= files_info->files.size()) - { - finished_generate = true; - return {}; - } - - current_path = files_info->files[current_file]; - - read_buf = getReadBufferFromFileURI(current_path); - ProcessorPtr format = std::make_shared(*read_buf, header, DB::FormatSettings()); - QueryPipelineBuilder builder; - builder.init(Pipe(format)); - pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); - - reader = std::make_unique(pipeline); - } - - Chunk chunk; - if (reader->pull(chunk)) - { - return chunk; - } - - /// Close file prematurely if stream was ended. - reader.reset(); - pipeline.reset(); - read_buf.reset(); - } - - return {}; -} -std::unique_ptr BatchParquetFileSource::getReadBufferFromFileURI(const String & file) -{ - Poco::URI file_uri(file); - const auto & schema = file_uri.getScheme(); - if (schema == "file") - { - return getReadBufferFromLocal(file_uri.getPath()); - } -#if USE_AWS_S3 - else if (schema == "s3") - { - return getReadBufferFromS3(file_uri.getHost(), file_uri.getPath().substr(1)); - } -#endif -#if USE_AZURE_BLOB_STORAGE - else if (schema == "wasb" || schema == "wasbs") - { - return getReadBufferFromBlob(file_uri.getPath().substr(1)); - } -#endif - else - { - throw std::runtime_error("unsupported schema " + schema); - } -} -std::unique_ptr BatchParquetFileSource::getReadBufferFromLocal(const String & file) -{ - std::unique_ptr read_buffer; - - struct stat file_stat - { - }; - - /// Check if file descriptor allows random reads (and reading it twice). - if (0 != stat(file.c_str(), &file_stat)) - throw std::runtime_error("Cannot stat file " + file); - - if (S_ISREG(file_stat.st_mode)) - read_buffer = std::make_unique(file); - else - read_buffer = std::make_unique(file); - return read_buffer; -} - -#if USE_AWS_S3 -std::unique_ptr BatchParquetFileSource::getReadBufferFromS3(const String & bucket, const String & key) -{ - if (!s3_client) - { - s3_client = getClient(context->getConfigRef(), "s3", context); - } - return std::make_unique(s3_client, bucket, key, 3, ReadSettings()); -} -#endif - -#if USE_AZURE_BLOB_STORAGE -std::unique_ptr BatchParquetFileSource::getReadBufferFromBlob(const String & file) -{ - if (!blob_container_client) - { - blob_container_client = getAzureBlobContainerClient(context->getConfigRef(), "blob"); - } - return std::make_unique(blob_container_client, file, 5, 5, DBMS_DEFAULT_BUFFER_SIZE); -} -#endif - -} diff --git a/utils/local-engine/Storages/BatchParquetFileSource.h b/utils/local-engine/Storages/BatchParquetFileSource.h deleted file mode 100644 index 741aab3a7306..000000000000 --- a/utils/local-engine/Storages/BatchParquetFileSource.h +++ /dev/null @@ -1,81 +0,0 @@ -#pragma once - -#if !defined(ARCADIA_BUILD) -#include -#endif - -#include -#include -#include -#include -#include -#include - -#if USE_AZURE_BLOB_STORAGE -#include -#endif - -#if USE_AWS_S3 -#include -#endif - - -namespace local_engine -{ - -struct FilesInfo -{ - std::vector files; - std::atomic next_file_to_read = 0; -}; -using FilesInfoPtr = std::shared_ptr; - -class BatchParquetFileSource : public DB::SourceWithProgress -{ -public: - BatchParquetFileSource(FilesInfoPtr files, const DB::Block & header, const DB::ContextPtr & context_); - -private: - String getName() const override - { - return "BatchParquetFileSource"; - } - - std::unique_ptr getReadBufferFromFileURI(const String &file); - - std::unique_ptr getReadBufferFromLocal(const String &file); - -#if USE_AWS_S3 - std::unique_ptr getReadBufferFromS3(const String &bucket, const String &key); -#endif - -#if USE_AZURE_BLOB_STORAGE - std::unique_ptr getReadBufferFromBlob(const String &file); -#endif - -protected: - DB::Chunk generate() override; - -private: - FilesInfoPtr files_info; - std::unique_ptr read_buf; - DB::QueryPipeline pipeline; - std::unique_ptr reader; - bool finished_generate = false; - std::string current_path; - DB::Block header; - DB::ContextPtr context; -#if USE_AZURE_BLOB_STORAGE - std::shared_ptr blob_container_client; -#endif - -#if USE_AWS_S3 - std::shared_ptr s3_client; -#endif - -}; - -using BatchParquetFileSourcePtr = std::shared_ptr; -} - - diff --git a/utils/local-engine/Storages/SubstraitSource/CMakeLists.txt b/utils/local-engine/Storages/SubstraitSource/CMakeLists.txt new file mode 100644 index 000000000000..75296f21e658 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/CMakeLists.txt @@ -0,0 +1,45 @@ + +set(ARROW_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src") + + +macro(add_headers_and_sources_including_cc prefix common_path) + add_glob(${prefix}_headers ${CMAKE_CURRENT_SOURCE_DIR} ${common_path}/*.h) + add_glob(${prefix}_sources ${common_path}/*.cpp ${common_path}/*.c ${common_path}/*.cc ${common_path}/*.h) +endmacro() + +add_headers_and_sources(substait_source .) +add_headers_and_sources_including_cc(ch_parquet arrow) +add_library(substait_source ${substait_source_sources}) +target_compile_options(substait_source PUBLIC -fPIC + -Wno-shadow-field-in-constructor + -Wno-return-type + -Wno-reserved-identifier + -Wno-extra-semi-stmt + -Wno-extra-semi + -Wno-unused-result + -Wno-unreachable-code-return + -Wno-unused-parameter + -Wno-unreachable-code + -Wno-pessimizing-move + -Wno-unreachable-code-break + -Wno-unused-variable + -Wno-inconsistent-missing-override + -Wno-shadow-uncaptured-local + -Wno-suggest-override + -Wno-unused-member-function + -Wno-deprecated-this-capture +) + +target_link_libraries(substait_source PUBLIC + boost::headers_only + ch_contrib::protobuf + clickhouse_common_io + ch_contrib::hdfs + substrait +) + +target_include_directories(substait_source SYSTEM BEFORE PUBLIC + ${ARROW_INCLUDE_DIR} + ${CMAKE_BINARY_DIR}/contrib/arrow-cmake/cpp/src + ${ClickHouse_SOURCE_DIR}/contrib/arrow-cmake/cpp/src +) \ No newline at end of file diff --git a/utils/local-engine/Storages/SubstraitSource/FormatFile.cpp b/utils/local-engine/Storages/SubstraitSource/FormatFile.cpp new file mode 100644 index 000000000000..c89ed2cca821 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/FormatFile.cpp @@ -0,0 +1,51 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} +} +namespace local_engine +{ +FormatFile::FormatFile( + DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_) + : context(context_), file_info(file_info_), read_buffer_builder(read_buffer_builder_) +{ + PartitionValues part_vals = StringUtils::parsePartitionTablePath(file_info.uri_file()); + for (const auto & part : part_vals) + { + partition_keys.push_back(part.first); + partition_values[part.first] = part.second; + } +} + +FormatFilePtr FormatFileUtil::createFile(DB::ContextPtr context, ReadBufferBuilderPtr read_buffer_builder, const substrait::ReadRel::LocalFiles::FileOrFiles & file) +{ + if (file.has_parquet()) + { + return std::make_shared(context, file, read_buffer_builder); + } + else + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Format not suupported:{}", file.DebugString()); + } + + __builtin_unreachable(); +} +} diff --git a/utils/local-engine/Storages/SubstraitSource/FormatFile.h b/utils/local-engine/Storages/SubstraitSource/FormatFile.h new file mode 100644 index 000000000000..59d8d720f2a0 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/FormatFile.h @@ -0,0 +1,71 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class FormatFile +{ +public: + struct InputFormat + { + public: + explicit InputFormat(DB::InputFormatPtr input_, std::unique_ptr read_buffer_) + : input(input_), read_buffer(std::move(read_buffer_)) + { + } + DB::InputFormatPtr input; + private: + std::unique_ptr read_buffer; + }; + using InputFormatPtr = std::shared_ptr; + + FormatFile( + DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_); + virtual ~FormatFile() = default; + + /// create a new input format for reading this file + virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0; + + /// Spark would split a large file into small segements and read in different tasks + /// If this file doesn't support the split feacture, only the task with offset 0 will generate data. + virtual bool supportSplit() { return false; } + + /// try to get rows from file metadata + virtual std::optional getTotalRows() { return {}; } + + /// get partition keys from file path + inline const std::vector & getFilePartitionKeys() const { return partition_keys; } + + inline const std::map & getFilePartitionValues() const { return partition_values; } + + virtual String getURIPath() const { return file_info.uri_file(); } + + virtual size_t getStartOffset() const { return file_info.start(); } + virtual size_t getLength() const { return file_info.length(); } + +protected: + DB::ContextPtr context; + substrait::ReadRel::LocalFiles::FileOrFiles file_info; + ReadBufferBuilderPtr read_buffer_builder; + std::vector partition_keys; + std::map partition_values; + +}; +using FormatFilePtr = std::shared_ptr; +using FormatFiles = std::vector; + +class FormatFileUtil +{ +public: + static FormatFilePtr createFile(DB::ContextPtr context, ReadBufferBuilderPtr read_buffer_builder, const substrait::ReadRel::LocalFiles::FileOrFiles & file); +}; +} diff --git a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp new file mode 100644 index 000000000000..7df242608193 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -0,0 +1,91 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} +} + +namespace local_engine +{ +ParquetFormatFile::ParquetFormatFile(DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_) + : FormatFile(context_, file_info_, read_buffer_builder_) +{ +} + +FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const DB::Block & header) +{ + auto row_group_indices = collectRowGroupIndices(); + auto read_buffer = read_buffer_builder->build(file_info); + auto input_format = std::make_shared(*read_buffer, header, DB::FormatSettings(), row_group_indices); + auto res = std::make_shared(input_format, std::move(read_buffer)); + return res; +} + +std::optional ParquetFormatFile::getTotalRows() +{ + { + std::lock_guard lock(mutex); + if (total_rows) + return total_rows; + } + auto row_group_indices = collectRowGroupIndices(); + size_t rows = 0; + auto file_meta = reader->parquet_reader()->metadata(); + for (auto i : row_group_indices) + { + rows += file_meta->RowGroup(i)->num_rows(); + } + { + std::lock_guard lock(mutex); + total_rows = rows; + return total_rows; + } +} + +void ParquetFormatFile::prepareReader() +{ + std::lock_guard lock(mutex); + if (reader) + return; + auto in = read_buffer_builder->build(file_info); + DB::FormatSettings format_settings; + format_settings.seekable_read = true; + std::atomic is_stopped{0}; + auto status = parquet::arrow::OpenFile( + asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader); + if (!status.ok()) + { + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Open file({}) failed. {}", file_info.uri_file(), status.ToString()); + } +} + +std::vector ParquetFormatFile::collectRowGroupIndices() +{ + prepareReader(); + auto file_meta = reader->parquet_reader()->metadata(); + std::vector indices; + for (int i = 0, n = file_meta->num_row_groups(); i < n; ++i) + { + auto row_group_meta = file_meta->RowGroup(i); + auto offset = static_cast(row_group_meta->file_offset()); + if (!offset) + { + offset = static_cast(row_group_meta->ColumnChunk(0)->file_offset()); + } + if (file_info.start() <= offset && offset < file_info.start() + file_info.length()) + { + indices.push_back(i); + } + } + return indices; +} +} diff --git a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h new file mode 100644 index 000000000000..839ec63005b1 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h @@ -0,0 +1,26 @@ +#pragma once +#include +#include +#include +namespace local_engine +{ +class ParquetFormatFile : public FormatFile +{ +public: + explicit ParquetFormatFile(DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_); + ~ParquetFormatFile() override = default; + FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) override; + std::optional getTotalRows() override; + bool supportSplit() override { return true; } + +private: + std::mutex mutex; + std::optional total_rows; + + std::unique_ptr reader; + void prepareReader(); + + std::vector collectRowGroupIndices(); +}; + +} diff --git a/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp new file mode 100644 index 000000000000..1a9d9468244b --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -0,0 +1,193 @@ +#include +#include +#include +#include +#include "IO/ReadSettings.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} +} + +namespace local_engine +{ + +class LocalFileReadBufferBuilder : public ReadBufferBuilder +{ +public: + explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~LocalFileReadBufferBuilder() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override + { + Poco::URI file_uri(file_info.uri_file()); + std::unique_ptr read_buffer; + const String & file_path = file_uri.getPath(); + struct stat file_stat; + if (stat(file_path.c_str(), &file_stat)) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "file stat failed for {}", file_path); + + if (S_ISREG(file_stat.st_mode)) + read_buffer = std::make_unique(file_path); + else + read_buffer = std::make_unique(file_path); + return read_buffer; + } +}; + +class HDFSFileReadBufferBuilder : public ReadBufferBuilder +{ +public: + explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~HDFSFileReadBufferBuilder() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override + { + Poco::URI file_uri(file_info.uri_file()); + std::unique_ptr read_buffer; + /// Need to set "hdfs.libhdfs3_conf" in global settings + read_buffer = std::make_unique( + "hdfs://" + file_uri.getHost(), file_uri.getPath(), context->getGlobalContext()->getConfigRef()); + return read_buffer; + } +}; + +#if USE_AWS_S3 +class S3FileReadBufferBuilder : public ReadBufferBuilder +{ +public: + explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~S3FileReadBufferBuilder() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override + { + Poco::URI file_uri(file_info.uri_file()); + auto client = getClient(); + std::unique_ptr readbuffer; + readbuffer + = std::make_unique(client, file_uri.getHost(), file_uri.getPath().substr(1), 3, DB::ReadSettings()); + return readbuffer; + } +private: + std::shared_ptr shared_client; + + std::shared_ptr getClient() + { + if (shared_client) + return shared_client; + const auto & config = context->getConfigRef(); + String config_prefix = "s3"; + DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration( + config.getString(config_prefix + ".region", ""), + context->getRemoteHostFilter(), + context->getGlobalContext()->getSettingsRef().s3_max_redirects); + + DB::S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint"))); + + client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000); + client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); + client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); + client_configuration.endpointOverride = uri.endpoint; + + client_configuration.retryStrategy + = std::make_shared(config.getUInt(config_prefix + ".retry_attempts", 10)); + + shared_client = DB::S3::ClientFactory::instance().create( + client_configuration, + uri.is_virtual_hosted_style, + config.getString(config_prefix + ".access_key_id", ""), + config.getString(config_prefix + ".secret_access_key", ""), + config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""), + {}, + config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)), + config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false))); + return shared_client; + } +}; +#endif + +#if USE_AZURE_BLOB_STORAGE +class AzureBlobReadBuffer : public ReadBufferBuilder +{ +public: + explicit AzureBlobReadBuffer(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~AzureBlobReadBuffer() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) + { + Poco::URI file_uri(file_info.uri_file()); + std::unique_ptr read_buffer; + read_buffer = std::make_unique(getClient(), file_uri.getPath(), 5, 5, DBMS_DEFAULT_BUFFER_SIZE); + return read_buffer; + } +private: + std::shared_ptr shared_client; + + std::shared_ptr getClient() + { + if (shared_client) + return shared_client; + shared_client = DB::getAzureBlobContainerClient(context->getConfigRef(), "blob"); + return shared_client; + } +}; +#endif + +void registerReadBufferBuildes(ReadBufferBuilderFactory & factory) +{ + LOG_TRACE(&Poco::Logger::get("ReadBufferBuilderFactory"), "+registerReadBufferBuildes"); + factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared(context_); }); + factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared(context_); }); + +#if USE_AWS_S3 + factory.registerBuilder("s3", [](DB::ContextPtr context_) { return std::make_shared(context_); }); +#endif + +#if USE_AZURE_BLOB_STORAGE + factory.registerBuilder("wasb", [](DB::ContextPtr context_) { return std::make_shared(context_); }); + factory.registerBuilder("wasbs", [](DB::ContextPtr context_) { return std::make_shared(context_); }); +#endif +} + +ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance() +{ + static ReadBufferBuilderFactory instance; + return instance; +} + +ReadBufferBuilderPtr ReadBufferBuilderFactory::createBuilder(const String & schema, DB::ContextPtr context) +{ + auto it = builders.find(schema); + if (it == builders.end()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found read buffer builder for {}", schema); + return it->second(context); +} + +void ReadBufferBuilderFactory::registerBuilder(const String & schema, NewBuilder newer) +{ + auto it = builders.find(schema); + if (it != builders.end()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "readbuffer builder for {} has been registered", schema); + builders[schema] = newer; +} + +} diff --git a/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h new file mode 100644 index 000000000000..02234ed9bd68 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h @@ -0,0 +1,38 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +namespace local_engine +{ +class ReadBufferBuilder +{ +public: + explicit ReadBufferBuilder(DB::ContextPtr context_) : context(context_) {} + virtual ~ReadBufferBuilder() = default; + /// build a new read buffer + virtual std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) = 0; +protected: + DB::ContextPtr context; +}; + +using ReadBufferBuilderPtr = std::shared_ptr; + +class ReadBufferBuilderFactory : public boost::noncopyable +{ +public: + using NewBuilder = std::function; + static ReadBufferBuilderFactory & instance(); + ReadBufferBuilderPtr createBuilder(const String & schema, DB::ContextPtr context); + + void registerBuilder(const String & schema, NewBuilder newer); + +private: + std::map builders; +}; + +void registerReadBufferBuildes(ReadBufferBuilderFactory & factory); +} diff --git a/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp new file mode 100644 index 000000000000..573a8743ba4e --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp @@ -0,0 +1,294 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int UNKNOWN_TYPE; + extern const int LOGICAL_ERROR; +} +} +namespace local_engine +{ +SubstraitFileSource::SubstraitFileSource(DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos) + : DB::SourceWithProgress(header_, false) + , context(context_) + , output_header(header_) +{ + if (!output_header.columns()) + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Empty columns to read. Maybe use count(col) instead of count(1)/count(*)"); + } + + to_read_header = output_header; + if (file_infos.items_size()) + { + Poco::URI file_uri(file_infos.items().Get(0).uri_file()); + read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context); + for (const auto & item : file_infos.items()) + { + files.emplace_back(FormatFileUtil::createFile(context, read_buffer_builder, item)); + } + + auto partition_keys = files[0]->getFilePartitionKeys(); + /// file partition keys are read from the file path + for (const auto & key : partition_keys) + { + to_read_header.erase(key); + } + } +} + + +DB::Chunk SubstraitFileSource::generate() +{ + while(current_file_index < files.size()) + { + if (!tryPrepareReader()) + { + /// all files finished + return {}; + } + + DB::Chunk chunk; + if (file_reader->pull(chunk)) + return chunk; + + /// try to read from next file + file_reader.reset(); + } +} + +bool SubstraitFileSource::tryPrepareReader() +{ + if (file_reader) [[likely]] + return true; + + if (current_file_index >= files.size()) + return false; + + auto current_file = files[current_file_index]; + current_file_index += 1; + + if (!current_file->supportSplit() && current_file->getStartOffset()) + { + /// For the files do not support split strategy, the task with not 0 offset will generate empty data + file_reader = std::make_unique(current_file); + return true; + } + if (!to_read_header.columns()) + { + auto total_rows = current_file->getTotalRows(); + if (total_rows) + { + file_reader = std::make_unique(current_file, context, output_header, *total_rows); + } + else + { + /// TODO: It may be a text format file that we do not have the stat metadata, e.g. total rows. + /// If we can get the file's schema, we can try to read all columns out. maybe consider make this + /// scan action fallback into spark + throw DB::Exception( + DB::ErrorCodes::LOGICAL_ERROR, + "All columns to read is partition columns, but this file({}) doesn't support this case.", + current_file->getURIPath()); + } + } + else + { + file_reader = std::make_unique(current_file, context, to_read_header, output_header); + } + + return true; +} + + +DB::ColumnPtr FileReaderWrapper::createConstColumn(DB::DataTypePtr data_type, const DB::Field & field, size_t rows) +{ + auto nested_type = DB::removeNullable(data_type); + auto column = nested_type->createColumnConst(rows, field); + + if (data_type->isNullable()) + column = DB::ColumnNullable::create(column, DB::ColumnUInt8::create(rows, 0)); + return column; +} + +#define BUILD_INT_FIELD(type) \ + [](DB::ReadBuffer & in, const String &) \ + {\ + type value = 0;\ + DB::readIntText(value, in);\ + return DB::Field(value);\ + } + +#define BUILD_FP_FIELD(type) \ + [](DB::ReadBuffer & in, const String &) \ + {\ + type value = 0.0;\ + DB::readFloatText(value, in);\ + return DB::Field(value);\ + } + +DB::Field FileReaderWrapper::buildFieldFromString(const String & str_value, DB::DataTypePtr type) +{ + using FieldBuilder = std::function; + static std::map field_builders + = {{magic_enum::enum_integer(DB::TypeIndex::Int8), BUILD_INT_FIELD(Int8) }, + {magic_enum::enum_integer(DB::TypeIndex::Int16), BUILD_INT_FIELD(Int16) }, + {magic_enum::enum_integer(DB::TypeIndex::Int32), BUILD_INT_FIELD(Int32) }, + {magic_enum::enum_integer(DB::TypeIndex::Int64), BUILD_INT_FIELD(Int64) }, + {magic_enum::enum_integer(DB::TypeIndex::Float32), BUILD_FP_FIELD(DB::Float32) }, + {magic_enum::enum_integer(DB::TypeIndex::Float64), BUILD_FP_FIELD(DB::Float64)}, + {magic_enum::enum_integer(DB::TypeIndex::String), [](DB::ReadBuffer &, const String & val) { return DB::Field(val); }}, + {magic_enum::enum_integer(DB::TypeIndex::Date), + [](DB::ReadBuffer & in, const String &) + { + DayNum value; + readDateText(value, in); + return DB::Field(value); + }}, + {magic_enum::enum_integer(DB::TypeIndex::Date32), + [](DB::ReadBuffer & in, const String &) + { + ExtendedDayNum value; + readDateText(value, in); + return DB::Field(value.toUnderType()); + }}}; + + auto nested_type = DB::removeNullable(type); + DB::ReadBufferFromString read_buffer(str_value); + auto it = field_builders.find(magic_enum::enum_integer(nested_type->getTypeId())); + if (it == field_builders.end()) + throw DB::Exception(DB::ErrorCodes::UNKNOWN_TYPE, "Unsupported data type {}", type->getFamilyName()); + return it->second(read_buffer, str_value); +} + +ConstColumnsFileReader::ConstColumnsFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & header_, size_t block_size_) + : FileReaderWrapper(file_) + , context(context_) + , header(header_) + , remained_rows(0) + , block_size(block_size_) +{ + auto rows = file->getTotalRows(); + if (!rows) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot get total rows number from file : {}", file->getURIPath()); + remained_rows = *rows; +} + +bool ConstColumnsFileReader::pull(DB::Chunk & chunk) +{ + if (!remained_rows) [[unlikely]] + return false; + size_t to_read_rows = 0; + if (remained_rows < block_size) + { + to_read_rows = remained_rows; + remained_rows = 0; + } + else + { + to_read_rows = block_size; + remained_rows -= block_size; + } + DB::Columns res_columns; + size_t columns_num = header.columns(); + res_columns.reserve(columns_num); + const auto & partition_values = file->getFilePartitionValues(); + for (size_t pos = 0; pos < columns_num; ++pos) + { + auto col_with_name_and_type = header.getByPosition(pos); + auto type = col_with_name_and_type.type; + const auto & name = col_with_name_and_type.name; + auto it = partition_values.find(name); + if (it == partition_values.end()) [[unlikely]] + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow partition column : {}", name); + } + auto field = buildFieldFromString(it->second, type); + auto column = createConstColumn(type, field, to_read_rows); + res_columns.emplace_back(column); + } + + chunk = DB::Chunk(std::move(res_columns), to_read_rows); + return true; +} + +NormalFileReader::NormalFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & to_read_header_, const DB::Block & output_header_) + : FileReaderWrapper(file_) + , context(context_) + , to_read_header(to_read_header_) + , output_header(output_header_) +{ + input_format = file->createInputFormat(to_read_header); + DB::Pipe pipe(input_format->input); + pipeline = std::make_unique(std::move(pipe)); + reader = std::make_unique(*pipeline); +} + + +bool NormalFileReader::pull(DB::Chunk & chunk) +{ + DB::Chunk tmp_chunk; + auto status = reader->pull(tmp_chunk); + if (!status) + { + return false; + } + + size_t rows = tmp_chunk.getNumRows(); + if (!rows) + return false; + + auto read_columns = tmp_chunk.detachColumns(); + DB::Columns res_columns; + auto columns_with_name_and_type = output_header.getColumnsWithTypeAndName(); + auto partition_values = file->getFilePartitionValues(); + + for (auto & column : columns_with_name_and_type) + { + if (to_read_header.has(column.name)) + { + auto pos = to_read_header.getPositionByName(column.name); + res_columns.push_back(read_columns[pos]); + } + else + { + auto it = partition_values.find(column.name); + if (it == partition_values.end()) + { + throw DB::Exception( + DB::ErrorCodes::LOGICAL_ERROR, "Not found column({}) from file({}) partition keys.", column.name, file->getURIPath()); + } + auto field = buildFieldFromString(it->second, column.type); + res_columns.push_back(createConstColumn(column.type, field, rows)); + } + } + chunk = DB::Chunk(std::move(res_columns), rows); + return true; +} +} diff --git a/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.h b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.h new file mode 100644 index 000000000000..99609b0dc292 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.h @@ -0,0 +1,96 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +namespace local_engine +{ + +class FileReaderWrapper +{ +public: + explicit FileReaderWrapper(FormatFilePtr file_) : file(file_) {} + virtual ~FileReaderWrapper() = default; + virtual bool pull(DB::Chunk & chunk) = 0; + +protected: + FormatFilePtr file; + + static DB::ColumnPtr createConstColumn(DB::DataTypePtr type, const DB::Field & field, size_t rows); + static DB::Field buildFieldFromString(const String & value, DB::DataTypePtr type); +}; + +class NormalFileReader : public FileReaderWrapper +{ +public: + NormalFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & to_read_header_, const DB::Block & output_header_); + ~NormalFileReader() override = default; + bool pull(DB::Chunk & chunk) override; + +private: + DB::ContextPtr context; + DB::Block to_read_header; + DB::Block output_header; + + FormatFile::InputFormatPtr input_format; + std::unique_ptr pipeline; + std::unique_ptr reader; +}; + +class EmptyFileReader : public FileReaderWrapper +{ +public: + explicit EmptyFileReader(FormatFilePtr file_) : FileReaderWrapper(file_) {} + ~EmptyFileReader() override = default; + bool pull(DB::Chunk &) override { return false; } +}; + +class ConstColumnsFileReader : public FileReaderWrapper +{ +public: + ConstColumnsFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & header_, size_t block_size_ = DEFAULT_BLOCK_SIZE); + ~ConstColumnsFileReader() override = default; + bool pull(DB::Chunk & chunk); +private: + DB::ContextPtr context; + DB::Block header; + size_t remained_rows; + size_t block_size; +}; + +class SubstraitFileSource : public DB::SourceWithProgress +{ +public: + SubstraitFileSource(DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos); + ~SubstraitFileSource() override = default; + + String getName() const override + { + return "SubstraitFileSource"; + } +protected: + DB::Chunk generate() override; +private: + DB::ContextPtr context; + DB::Block output_header; + DB::Block to_read_header; + FormatFiles files; + + UInt32 current_file_index = 0; + std::unique_ptr file_reader; + ReadBufferBuilderPtr read_buffer_builder; + + bool tryPrepareReader(); +}; +} diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index cc1c2d36619e..8a557b25e3bf 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -21,6 +21,7 @@ #include #include #include +#include bool inside_main = true; @@ -137,6 +138,7 @@ jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B"); local_engine::JNIUtils::vm = vm; + local_engine::registerReadBufferBuildes(local_engine::ReadBufferBuilderFactory::instance()); return JNI_VERSION_1_8; } diff --git a/utils/local-engine/tests/benchmark_local_engine.cpp b/utils/local-engine/tests/benchmark_local_engine.cpp index 9a4d066c3ecd..3674faa60b52 100644 --- a/utils/local-engine/tests/benchmark_local_engine.cpp +++ b/utils/local-engine/tests/benchmark_local_engine.cpp @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -34,6 +34,7 @@ #include #include #include +#include #include "testConfig.h" #if defined(__SSE2__) @@ -168,13 +169,16 @@ DB::ContextMutablePtr global_context; for (auto _ : state) { - auto files = std::make_shared(); - files->files = { - "file:///home/hongbin/code/gluten/jvm/src/test/resources/tpch-data/lineitem/" - "part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet", - }; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "file:///home/hongbin/code/gluten/jvm/src/test/resources/tpch-data/lineitem/" + "part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); auto builder = std::make_unique(); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); auto result = header.cloneEmpty(); diff --git a/utils/local-engine/tests/benchmark_parquet_read.cpp b/utils/local-engine/tests/benchmark_parquet_read.cpp index 47636166bb65..3d6bef2f084a 100644 --- a/utils/local-engine/tests/benchmark_parquet_read.cpp +++ b/utils/local-engine/tests/benchmark_parquet_read.cpp @@ -16,8 +16,9 @@ #include #include #include -#include +#include #include +#include static void BM_ParquetReadString(benchmark::State& state) { @@ -82,12 +83,15 @@ static void BM_OptimizedParquetReadString(benchmark::State& state) for (auto _ : state) { - auto files_info = std::make_shared(); - files_info->files = {file}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file_item = files.add_items(); + file_item->set_uri_file(file); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file_item->mutable_parquet()->CopyFrom(parquet_format); auto builder = std::make_unique(); - builder->init(Pipe(std::make_shared( - files_info, header, local_engine::SerializedPlanParser::global_context))); + builder->init( + Pipe(std::make_shared(local_engine::SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto reader = PullingPipelineExecutor(pipeline); while (reader.pull(res)) @@ -112,12 +116,15 @@ static void BM_OptimizedParquetReadDate32(benchmark::State& state) for (auto _ : state) { - auto files_info = std::make_shared(); - files_info->files = {file}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file_item = files.add_items(); + file_item->set_uri_file(file); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file_item->mutable_parquet()->CopyFrom(parquet_format); auto builder = std::make_unique(); - builder->init(Pipe(std::make_shared( - files_info, header, local_engine::SerializedPlanParser::global_context))); + builder->init( + Pipe(std::make_shared(local_engine::SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto reader = PullingPipelineExecutor(pipeline); while (reader.pull(res)) diff --git a/utils/local-engine/tests/gtest_ch_join.cpp b/utils/local-engine/tests/gtest_ch_join.cpp index 3fc258c3740b..28af2d08c000 100644 --- a/utils/local-engine/tests/gtest_ch_join.cpp +++ b/utils/local-engine/tests/gtest_ch_join.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include @@ -16,6 +16,7 @@ #include #include +#include using namespace DB; diff --git a/utils/local-engine/tests/gtest_ch_storages.cpp b/utils/local-engine/tests/gtest_ch_storages.cpp index 23804b26db33..8e3ce010843f 100644 --- a/utils/local-engine/tests/gtest_ch_storages.cpp +++ b/utils/local-engine/tests/gtest_ch_storages.cpp @@ -1,12 +1,13 @@ #include #include -#include +#include #include #include #include #include #include #include +#include using namespace DB; using namespace local_engine; @@ -24,8 +25,12 @@ TEST(TestBatchParquetFileSource, blob) "devstoreaccount1;"); auto builder = std::make_unique(); - auto files = std::make_shared(); - files->files = {"wasb://libch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "wasb://libch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); const auto * type_string = "columns format version: 1\n" "15 columns:\n" @@ -55,7 +60,7 @@ TEST(TestBatchParquetFileSource, blob) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -83,9 +88,12 @@ TEST(TestBatchParquetFileSource, s3) config->setString("s3.secret_access_key", "password"); auto builder = std::make_unique(); - auto files = std::make_shared(); - files->files = {"s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; - // files->files = {"file:///home/saber/Downloads/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); const auto * type_string = "columns format version: 1\n" "15 columns:\n" @@ -115,7 +123,7 @@ TEST(TestBatchParquetFileSource, s3) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -137,12 +145,18 @@ TEST(TestBatchParquetFileSource, s3) TEST(TestBatchParquetFileSource, local_file) { auto builder = std::make_unique(); - auto files = std::make_shared(); - files->files = { - "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet", - "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00001-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet", - "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00002-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet", - }; + + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + file->set_uri_file("file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); + file = files.add_items(); + file->set_uri_file("file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00001-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + file->mutable_parquet()->CopyFrom(parquet_format); + file = files.add_items(); + file->set_uri_file("file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00002-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + file->mutable_parquet()->CopyFrom(parquet_format); const auto * type_string = "columns format version: 1\n" "2 columns:\n" @@ -172,7 +186,7 @@ TEST(TestBatchParquetFileSource, local_file) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -234,9 +248,12 @@ TEST(TestWrite, MergeTreeWriteTest) std::move(settings) ); - auto files_info = std::make_shared(); - files_info->files = {"s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; - auto source = std::make_shared(files_info, metadata->getSampleBlock(), SerializedPlanParser::global_context); + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + file->set_uri_file("s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); + auto source = std::make_shared(SerializedPlanParser::global_context, metadata->getSampleBlock(), files); QueryPipelineBuilder query_pipeline_builder; query_pipeline_builder.init(Pipe(source)); diff --git a/utils/local-engine/tests/gtest_local_engine.cpp b/utils/local-engine/tests/gtest_local_engine.cpp index d8e90f2feb59..49a4daf880f6 100644 --- a/utils/local-engine/tests/gtest_local_engine.cpp +++ b/utils/local-engine/tests/gtest_local_engine.cpp @@ -14,13 +14,14 @@ #include #include #include -#include +#include #include #include #include #include #include #include +#include #include "Storages/CustomStorageMergeTree.h" #include "testConfig.h" @@ -205,9 +206,14 @@ TEST(TestSelect, MergeTreeWriteTest) DB::StorageID("default", "test"), "test-intel/", *metadata, false, global_context, "", param, std::move(settings)); auto sink = std::make_shared(custom_merge_tree, metadata, global_context); - auto files_info = std::make_shared(); - files_info->files.push_back("/home/kyligence/Documents/test-dataset/intel-gazelle-test-150.snappy.parquet"); - auto source = std::make_shared(files_info, metadata->getSampleBlock(), SerializedPlanParser::global_context); + + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "file:///home/kyligence/Documents/test-dataset/intel-gazelle-test-150.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); + auto source = std::make_shared(SerializedPlanParser::global_context, metadata->getSampleBlock(), files); QueryPipelineBuilder query_pipeline; query_pipeline.init(Pipe(source));