diff --git a/utils/local-engine/CMakeLists.txt b/utils/local-engine/CMakeLists.txt index 28ae0902247c..f4bbe99c835f 100644 --- a/utils/local-engine/CMakeLists.txt +++ b/utils/local-engine/CMakeLists.txt @@ -30,6 +30,7 @@ include_directories( ) add_subdirectory(Storages/ch_parquet) +add_subdirectory(Storages/SubstraitSource) add_library(${LOCALENGINE_SHARED_LIB} SHARED ${builder_sources} @@ -39,6 +40,7 @@ add_library(${LOCALENGINE_SHARED_LIB} SHARED ${external_sources} ${shuffle_sources} ${jni_sources} + ${substrait_source} ${operator_sources} local_engine_jni.cpp) @@ -55,6 +57,7 @@ target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC substrait loggers ch_parquet + substait_source ) # if (USE_LIBCXX) diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index 558bcc55fb2f..7d32c01d2a89 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include @@ -42,6 +41,8 @@ #include #include #include +#include + #include "SerializedPlanParser.h" namespace DB @@ -53,7 +54,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int NO_SUCH_DATA_PART; extern const int UNKNOWN_FUNCTION; - extern const int NOT_IMPLEMENTED; } } @@ -200,35 +200,12 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrait::R { assert(rel.has_local_files()); assert(rel.has_base_schema()); - auto files_info = std::make_shared(); - for (const auto & item : rel.local_files().items()) - { - files_info->files.push_back(item.uri_file()); - } auto header = parseNameStruct(rel.base_schema()); - PartitionValues partition_values = StringUtils::parsePartitionTablePath(files_info->files[0]); - if (partition_values.size() > 1) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "doesn't support multiple level partition."); - } - ProcessorPtr partition_transform; - if (!partition_values.empty()) - { - auto origin_header = header.cloneEmpty(); - PartitionValue partition_value = partition_values[0]; - header.erase(partition_value.first); - partition_transform - = std::make_shared(header, origin_header, partition_value.first, partition_value.second); - } - auto query_plan = std::make_unique(); - std::shared_ptr source = std::make_shared(files_info, header, context); + auto source = std::make_shared(context, header, rel.local_files()); auto source_pipe = Pipe(source); - if (partition_transform) - { - source_pipe.addTransform(partition_transform); - } - auto source_step = std::make_unique(std::move(source_pipe), "Parquet"); - source_step->setStepDescription("Read Parquet"); + auto source_step = std::make_unique(std::move(source_pipe), "substrait local files"); + source_step->setStepDescription("read local files"); + auto query_plan = std::make_unique(); query_plan->addStep(std::move(source_step)); return query_plan; } diff --git a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp index a3f3bb7b9599..1b314205a22e 100644 --- a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp +++ b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.cpp @@ -13,8 +13,9 @@ using namespace DB; namespace local_engine { ArrowParquetBlockInputFormat::ArrowParquetBlockInputFormat( - DB::ReadBuffer & in_, const DB::Block & header, const DB::FormatSettings & formatSettings) + DB::ReadBuffer & in_, const DB::Block & header, const DB::FormatSettings & formatSettings, const std::vector & row_group_indices_) : OptimizedParquetBlockInputFormat(in_, header, formatSettings) + , row_group_indices(row_group_indices_) { } @@ -74,8 +75,11 @@ DB::Chunk ArrowParquetBlockInputFormat::generate() } index += indexes_count; } - auto row_group_range = boost::irange(0, file_reader->num_row_groups()); - auto row_group_indices = std::vector(row_group_range.begin(), row_group_range.end()); + if (row_group_indices.empty()) + { + auto row_group_range = boost::irange(0, file_reader->num_row_groups()); + row_group_indices = std::vector(row_group_range.begin(), row_group_range.end()); + } auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, ¤t_record_batch_reader); if (!read_status.ok()) throw std::runtime_error{"Error while reading Parquet data: " + read_status.ToString()}; diff --git a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h index 4c6809b8b300..e01741f5be8b 100644 --- a/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h +++ b/utils/local-engine/Storages/ArrowParquetBlockInputFormat.h @@ -16,7 +16,8 @@ namespace local_engine class ArrowParquetBlockInputFormat : public DB::OptimizedParquetBlockInputFormat { public: - ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings); + ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings, const std::vector & row_group_indices_ = {}); + //virtual ~ArrowParquetBlockInputFormat(); private: DB::Chunk generate() override; @@ -24,6 +25,7 @@ class ArrowParquetBlockInputFormat : public DB::OptimizedParquetBlockInputFormat int64_t convert_time = 0; int64_t non_convert_time = 0; std::shared_ptr current_record_batch_reader; + std::vector row_group_indices; }; } diff --git a/utils/local-engine/Storages/BatchParquetFileSource.cpp b/utils/local-engine/Storages/BatchParquetFileSource.cpp deleted file mode 100644 index 9b229631d88b..000000000000 --- a/utils/local-engine/Storages/BatchParquetFileSource.cpp +++ /dev/null @@ -1,162 +0,0 @@ -#include "BatchParquetFileSource.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace DB; - -namespace local_engine -{ -#if USE_AWS_S3 - -std::shared_ptr -getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context) -{ - S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration( - config.getString(config_prefix + ".region", ""), - context->getRemoteHostFilter(), - context->getGlobalContext()->getSettingsRef().s3_max_redirects); - - S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint"))); - - client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000); - client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); - client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); - client_configuration.endpointOverride = uri.endpoint; - - client_configuration.retryStrategy - = std::make_shared(config.getUInt(config_prefix + ".retry_attempts", 10)); - - return S3::ClientFactory::instance().create( - client_configuration, - uri.is_virtual_hosted_style, - config.getString(config_prefix + ".access_key_id", ""), - config.getString(config_prefix + ".secret_access_key", ""), - config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""), - {}, - config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)), - config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false))); -} -#endif - -BatchParquetFileSource::BatchParquetFileSource(FilesInfoPtr files_info_, const DB::Block & header_, const DB::ContextPtr & context_) - : SourceWithProgress(header_), files_info(files_info_), header(header_), context(context_) -{ -} -DB::Chunk BatchParquetFileSource::generate() -{ - while (!finished_generate) - { - /// Open file lazily on first read. This is needed to avoid too many open files from different streams. - if (!reader) - { - auto current_file = files_info->next_file_to_read.fetch_add(1); - if (current_file >= files_info->files.size()) - { - finished_generate = true; - return {}; - } - - current_path = files_info->files[current_file]; - - read_buf = getReadBufferFromFileURI(current_path); - ProcessorPtr format = std::make_shared(*read_buf, header, DB::FormatSettings()); - QueryPipelineBuilder builder; - builder.init(Pipe(format)); - pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); - - reader = std::make_unique(pipeline); - } - - Chunk chunk; - if (reader->pull(chunk)) - { - return chunk; - } - - /// Close file prematurely if stream was ended. - reader.reset(); - pipeline.reset(); - read_buf.reset(); - } - - return {}; -} -std::unique_ptr BatchParquetFileSource::getReadBufferFromFileURI(const String & file) -{ - Poco::URI file_uri(file); - const auto & schema = file_uri.getScheme(); - if (schema == "file") - { - return getReadBufferFromLocal(file_uri.getPath()); - } -#if USE_AWS_S3 - else if (schema == "s3") - { - return getReadBufferFromS3(file_uri.getHost(), file_uri.getPath().substr(1)); - } -#endif -#if USE_AZURE_BLOB_STORAGE - else if (schema == "wasb" || schema == "wasbs") - { - return getReadBufferFromBlob(file_uri.getPath().substr(1)); - } -#endif - else - { - throw std::runtime_error("unsupported schema " + schema); - } -} -std::unique_ptr BatchParquetFileSource::getReadBufferFromLocal(const String & file) -{ - std::unique_ptr read_buffer; - - struct stat file_stat - { - }; - - /// Check if file descriptor allows random reads (and reading it twice). - if (0 != stat(file.c_str(), &file_stat)) - throw std::runtime_error("Cannot stat file " + file); - - if (S_ISREG(file_stat.st_mode)) - read_buffer = std::make_unique(file); - else - read_buffer = std::make_unique(file); - return read_buffer; -} - -#if USE_AWS_S3 -std::unique_ptr BatchParquetFileSource::getReadBufferFromS3(const String & bucket, const String & key) -{ - if (!s3_client) - { - s3_client = getClient(context->getConfigRef(), "s3", context); - } - return std::make_unique(s3_client, bucket, key, 3, ReadSettings()); -} -#endif - -#if USE_AZURE_BLOB_STORAGE -std::unique_ptr BatchParquetFileSource::getReadBufferFromBlob(const String & file) -{ - if (!blob_container_client) - { - blob_container_client = getAzureBlobContainerClient(context->getConfigRef(), "blob"); - } - return std::make_unique(blob_container_client, file, 5, 5, DBMS_DEFAULT_BUFFER_SIZE); -} -#endif - -} diff --git a/utils/local-engine/Storages/BatchParquetFileSource.h b/utils/local-engine/Storages/BatchParquetFileSource.h deleted file mode 100644 index 741aab3a7306..000000000000 --- a/utils/local-engine/Storages/BatchParquetFileSource.h +++ /dev/null @@ -1,81 +0,0 @@ -#pragma once - -#if !defined(ARCADIA_BUILD) -#include -#endif - -#include -#include -#include -#include -#include -#include - -#if USE_AZURE_BLOB_STORAGE -#include -#endif - -#if USE_AWS_S3 -#include -#endif - - -namespace local_engine -{ - -struct FilesInfo -{ - std::vector files; - std::atomic next_file_to_read = 0; -}; -using FilesInfoPtr = std::shared_ptr; - -class BatchParquetFileSource : public DB::SourceWithProgress -{ -public: - BatchParquetFileSource(FilesInfoPtr files, const DB::Block & header, const DB::ContextPtr & context_); - -private: - String getName() const override - { - return "BatchParquetFileSource"; - } - - std::unique_ptr getReadBufferFromFileURI(const String &file); - - std::unique_ptr getReadBufferFromLocal(const String &file); - -#if USE_AWS_S3 - std::unique_ptr getReadBufferFromS3(const String &bucket, const String &key); -#endif - -#if USE_AZURE_BLOB_STORAGE - std::unique_ptr getReadBufferFromBlob(const String &file); -#endif - -protected: - DB::Chunk generate() override; - -private: - FilesInfoPtr files_info; - std::unique_ptr read_buf; - DB::QueryPipeline pipeline; - std::unique_ptr reader; - bool finished_generate = false; - std::string current_path; - DB::Block header; - DB::ContextPtr context; -#if USE_AZURE_BLOB_STORAGE - std::shared_ptr blob_container_client; -#endif - -#if USE_AWS_S3 - std::shared_ptr s3_client; -#endif - -}; - -using BatchParquetFileSourcePtr = std::shared_ptr; -} - - diff --git a/utils/local-engine/Storages/SubstraitSource/CMakeLists.txt b/utils/local-engine/Storages/SubstraitSource/CMakeLists.txt new file mode 100644 index 000000000000..75296f21e658 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/CMakeLists.txt @@ -0,0 +1,45 @@ + +set(ARROW_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src") + + +macro(add_headers_and_sources_including_cc prefix common_path) + add_glob(${prefix}_headers ${CMAKE_CURRENT_SOURCE_DIR} ${common_path}/*.h) + add_glob(${prefix}_sources ${common_path}/*.cpp ${common_path}/*.c ${common_path}/*.cc ${common_path}/*.h) +endmacro() + +add_headers_and_sources(substait_source .) +add_headers_and_sources_including_cc(ch_parquet arrow) +add_library(substait_source ${substait_source_sources}) +target_compile_options(substait_source PUBLIC -fPIC + -Wno-shadow-field-in-constructor + -Wno-return-type + -Wno-reserved-identifier + -Wno-extra-semi-stmt + -Wno-extra-semi + -Wno-unused-result + -Wno-unreachable-code-return + -Wno-unused-parameter + -Wno-unreachable-code + -Wno-pessimizing-move + -Wno-unreachable-code-break + -Wno-unused-variable + -Wno-inconsistent-missing-override + -Wno-shadow-uncaptured-local + -Wno-suggest-override + -Wno-unused-member-function + -Wno-deprecated-this-capture +) + +target_link_libraries(substait_source PUBLIC + boost::headers_only + ch_contrib::protobuf + clickhouse_common_io + ch_contrib::hdfs + substrait +) + +target_include_directories(substait_source SYSTEM BEFORE PUBLIC + ${ARROW_INCLUDE_DIR} + ${CMAKE_BINARY_DIR}/contrib/arrow-cmake/cpp/src + ${ClickHouse_SOURCE_DIR}/contrib/arrow-cmake/cpp/src +) \ No newline at end of file diff --git a/utils/local-engine/Storages/SubstraitSource/FormatFile.cpp b/utils/local-engine/Storages/SubstraitSource/FormatFile.cpp new file mode 100644 index 000000000000..c89ed2cca821 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/FormatFile.cpp @@ -0,0 +1,51 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} +} +namespace local_engine +{ +FormatFile::FormatFile( + DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_) + : context(context_), file_info(file_info_), read_buffer_builder(read_buffer_builder_) +{ + PartitionValues part_vals = StringUtils::parsePartitionTablePath(file_info.uri_file()); + for (const auto & part : part_vals) + { + partition_keys.push_back(part.first); + partition_values[part.first] = part.second; + } +} + +FormatFilePtr FormatFileUtil::createFile(DB::ContextPtr context, ReadBufferBuilderPtr read_buffer_builder, const substrait::ReadRel::LocalFiles::FileOrFiles & file) +{ + if (file.has_parquet()) + { + return std::make_shared(context, file, read_buffer_builder); + } + else + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Format not suupported:{}", file.DebugString()); + } + + __builtin_unreachable(); +} +} diff --git a/utils/local-engine/Storages/SubstraitSource/FormatFile.h b/utils/local-engine/Storages/SubstraitSource/FormatFile.h new file mode 100644 index 000000000000..59d8d720f2a0 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/FormatFile.h @@ -0,0 +1,71 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class FormatFile +{ +public: + struct InputFormat + { + public: + explicit InputFormat(DB::InputFormatPtr input_, std::unique_ptr read_buffer_) + : input(input_), read_buffer(std::move(read_buffer_)) + { + } + DB::InputFormatPtr input; + private: + std::unique_ptr read_buffer; + }; + using InputFormatPtr = std::shared_ptr; + + FormatFile( + DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_); + virtual ~FormatFile() = default; + + /// create a new input format for reading this file + virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0; + + /// Spark would split a large file into small segements and read in different tasks + /// If this file doesn't support the split feacture, only the task with offset 0 will generate data. + virtual bool supportSplit() { return false; } + + /// try to get rows from file metadata + virtual std::optional getTotalRows() { return {}; } + + /// get partition keys from file path + inline const std::vector & getFilePartitionKeys() const { return partition_keys; } + + inline const std::map & getFilePartitionValues() const { return partition_values; } + + virtual String getURIPath() const { return file_info.uri_file(); } + + virtual size_t getStartOffset() const { return file_info.start(); } + virtual size_t getLength() const { return file_info.length(); } + +protected: + DB::ContextPtr context; + substrait::ReadRel::LocalFiles::FileOrFiles file_info; + ReadBufferBuilderPtr read_buffer_builder; + std::vector partition_keys; + std::map partition_values; + +}; +using FormatFilePtr = std::shared_ptr; +using FormatFiles = std::vector; + +class FormatFileUtil +{ +public: + static FormatFilePtr createFile(DB::ContextPtr context, ReadBufferBuilderPtr read_buffer_builder, const substrait::ReadRel::LocalFiles::FileOrFiles & file); +}; +} diff --git a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp new file mode 100644 index 000000000000..7df242608193 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -0,0 +1,91 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} +} + +namespace local_engine +{ +ParquetFormatFile::ParquetFormatFile(DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_) + : FormatFile(context_, file_info_, read_buffer_builder_) +{ +} + +FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const DB::Block & header) +{ + auto row_group_indices = collectRowGroupIndices(); + auto read_buffer = read_buffer_builder->build(file_info); + auto input_format = std::make_shared(*read_buffer, header, DB::FormatSettings(), row_group_indices); + auto res = std::make_shared(input_format, std::move(read_buffer)); + return res; +} + +std::optional ParquetFormatFile::getTotalRows() +{ + { + std::lock_guard lock(mutex); + if (total_rows) + return total_rows; + } + auto row_group_indices = collectRowGroupIndices(); + size_t rows = 0; + auto file_meta = reader->parquet_reader()->metadata(); + for (auto i : row_group_indices) + { + rows += file_meta->RowGroup(i)->num_rows(); + } + { + std::lock_guard lock(mutex); + total_rows = rows; + return total_rows; + } +} + +void ParquetFormatFile::prepareReader() +{ + std::lock_guard lock(mutex); + if (reader) + return; + auto in = read_buffer_builder->build(file_info); + DB::FormatSettings format_settings; + format_settings.seekable_read = true; + std::atomic is_stopped{0}; + auto status = parquet::arrow::OpenFile( + asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader); + if (!status.ok()) + { + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Open file({}) failed. {}", file_info.uri_file(), status.ToString()); + } +} + +std::vector ParquetFormatFile::collectRowGroupIndices() +{ + prepareReader(); + auto file_meta = reader->parquet_reader()->metadata(); + std::vector indices; + for (int i = 0, n = file_meta->num_row_groups(); i < n; ++i) + { + auto row_group_meta = file_meta->RowGroup(i); + auto offset = static_cast(row_group_meta->file_offset()); + if (!offset) + { + offset = static_cast(row_group_meta->ColumnChunk(0)->file_offset()); + } + if (file_info.start() <= offset && offset < file_info.start() + file_info.length()) + { + indices.push_back(i); + } + } + return indices; +} +} diff --git a/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h new file mode 100644 index 000000000000..839ec63005b1 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.h @@ -0,0 +1,26 @@ +#pragma once +#include +#include +#include +namespace local_engine +{ +class ParquetFormatFile : public FormatFile +{ +public: + explicit ParquetFormatFile(DB::ContextPtr context_, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr read_buffer_builder_); + ~ParquetFormatFile() override = default; + FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) override; + std::optional getTotalRows() override; + bool supportSplit() override { return true; } + +private: + std::mutex mutex; + std::optional total_rows; + + std::unique_ptr reader; + void prepareReader(); + + std::vector collectRowGroupIndices(); +}; + +} diff --git a/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp new file mode 100644 index 000000000000..1a9d9468244b --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -0,0 +1,193 @@ +#include +#include +#include +#include +#include "IO/ReadSettings.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} +} + +namespace local_engine +{ + +class LocalFileReadBufferBuilder : public ReadBufferBuilder +{ +public: + explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~LocalFileReadBufferBuilder() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override + { + Poco::URI file_uri(file_info.uri_file()); + std::unique_ptr read_buffer; + const String & file_path = file_uri.getPath(); + struct stat file_stat; + if (stat(file_path.c_str(), &file_stat)) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "file stat failed for {}", file_path); + + if (S_ISREG(file_stat.st_mode)) + read_buffer = std::make_unique(file_path); + else + read_buffer = std::make_unique(file_path); + return read_buffer; + } +}; + +class HDFSFileReadBufferBuilder : public ReadBufferBuilder +{ +public: + explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~HDFSFileReadBufferBuilder() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override + { + Poco::URI file_uri(file_info.uri_file()); + std::unique_ptr read_buffer; + /// Need to set "hdfs.libhdfs3_conf" in global settings + read_buffer = std::make_unique( + "hdfs://" + file_uri.getHost(), file_uri.getPath(), context->getGlobalContext()->getConfigRef()); + return read_buffer; + } +}; + +#if USE_AWS_S3 +class S3FileReadBufferBuilder : public ReadBufferBuilder +{ +public: + explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~S3FileReadBufferBuilder() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override + { + Poco::URI file_uri(file_info.uri_file()); + auto client = getClient(); + std::unique_ptr readbuffer; + readbuffer + = std::make_unique(client, file_uri.getHost(), file_uri.getPath().substr(1), 3, DB::ReadSettings()); + return readbuffer; + } +private: + std::shared_ptr shared_client; + + std::shared_ptr getClient() + { + if (shared_client) + return shared_client; + const auto & config = context->getConfigRef(); + String config_prefix = "s3"; + DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration( + config.getString(config_prefix + ".region", ""), + context->getRemoteHostFilter(), + context->getGlobalContext()->getSettingsRef().s3_max_redirects); + + DB::S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint"))); + + client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000); + client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); + client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); + client_configuration.endpointOverride = uri.endpoint; + + client_configuration.retryStrategy + = std::make_shared(config.getUInt(config_prefix + ".retry_attempts", 10)); + + shared_client = DB::S3::ClientFactory::instance().create( + client_configuration, + uri.is_virtual_hosted_style, + config.getString(config_prefix + ".access_key_id", ""), + config.getString(config_prefix + ".secret_access_key", ""), + config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""), + {}, + config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)), + config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false))); + return shared_client; + } +}; +#endif + +#if USE_AZURE_BLOB_STORAGE +class AzureBlobReadBuffer : public ReadBufferBuilder +{ +public: + explicit AzureBlobReadBuffer(DB::ContextPtr context_) : ReadBufferBuilder(context_) {} + ~AzureBlobReadBuffer() override = default; + + std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) + { + Poco::URI file_uri(file_info.uri_file()); + std::unique_ptr read_buffer; + read_buffer = std::make_unique(getClient(), file_uri.getPath(), 5, 5, DBMS_DEFAULT_BUFFER_SIZE); + return read_buffer; + } +private: + std::shared_ptr shared_client; + + std::shared_ptr getClient() + { + if (shared_client) + return shared_client; + shared_client = DB::getAzureBlobContainerClient(context->getConfigRef(), "blob"); + return shared_client; + } +}; +#endif + +void registerReadBufferBuildes(ReadBufferBuilderFactory & factory) +{ + LOG_TRACE(&Poco::Logger::get("ReadBufferBuilderFactory"), "+registerReadBufferBuildes"); + factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared(context_); }); + factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared(context_); }); + +#if USE_AWS_S3 + factory.registerBuilder("s3", [](DB::ContextPtr context_) { return std::make_shared(context_); }); +#endif + +#if USE_AZURE_BLOB_STORAGE + factory.registerBuilder("wasb", [](DB::ContextPtr context_) { return std::make_shared(context_); }); + factory.registerBuilder("wasbs", [](DB::ContextPtr context_) { return std::make_shared(context_); }); +#endif +} + +ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance() +{ + static ReadBufferBuilderFactory instance; + return instance; +} + +ReadBufferBuilderPtr ReadBufferBuilderFactory::createBuilder(const String & schema, DB::ContextPtr context) +{ + auto it = builders.find(schema); + if (it == builders.end()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found read buffer builder for {}", schema); + return it->second(context); +} + +void ReadBufferBuilderFactory::registerBuilder(const String & schema, NewBuilder newer) +{ + auto it = builders.find(schema); + if (it != builders.end()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "readbuffer builder for {} has been registered", schema); + builders[schema] = newer; +} + +} diff --git a/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h new file mode 100644 index 000000000000..02234ed9bd68 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h @@ -0,0 +1,38 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +namespace local_engine +{ +class ReadBufferBuilder +{ +public: + explicit ReadBufferBuilder(DB::ContextPtr context_) : context(context_) {} + virtual ~ReadBufferBuilder() = default; + /// build a new read buffer + virtual std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) = 0; +protected: + DB::ContextPtr context; +}; + +using ReadBufferBuilderPtr = std::shared_ptr; + +class ReadBufferBuilderFactory : public boost::noncopyable +{ +public: + using NewBuilder = std::function; + static ReadBufferBuilderFactory & instance(); + ReadBufferBuilderPtr createBuilder(const String & schema, DB::ContextPtr context); + + void registerBuilder(const String & schema, NewBuilder newer); + +private: + std::map builders; +}; + +void registerReadBufferBuildes(ReadBufferBuilderFactory & factory); +} diff --git a/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp new file mode 100644 index 000000000000..573a8743ba4e --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp @@ -0,0 +1,294 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int UNKNOWN_TYPE; + extern const int LOGICAL_ERROR; +} +} +namespace local_engine +{ +SubstraitFileSource::SubstraitFileSource(DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos) + : DB::SourceWithProgress(header_, false) + , context(context_) + , output_header(header_) +{ + if (!output_header.columns()) + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Empty columns to read. Maybe use count(col) instead of count(1)/count(*)"); + } + + to_read_header = output_header; + if (file_infos.items_size()) + { + Poco::URI file_uri(file_infos.items().Get(0).uri_file()); + read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context); + for (const auto & item : file_infos.items()) + { + files.emplace_back(FormatFileUtil::createFile(context, read_buffer_builder, item)); + } + + auto partition_keys = files[0]->getFilePartitionKeys(); + /// file partition keys are read from the file path + for (const auto & key : partition_keys) + { + to_read_header.erase(key); + } + } +} + + +DB::Chunk SubstraitFileSource::generate() +{ + while(current_file_index < files.size()) + { + if (!tryPrepareReader()) + { + /// all files finished + return {}; + } + + DB::Chunk chunk; + if (file_reader->pull(chunk)) + return chunk; + + /// try to read from next file + file_reader.reset(); + } +} + +bool SubstraitFileSource::tryPrepareReader() +{ + if (file_reader) [[likely]] + return true; + + if (current_file_index >= files.size()) + return false; + + auto current_file = files[current_file_index]; + current_file_index += 1; + + if (!current_file->supportSplit() && current_file->getStartOffset()) + { + /// For the files do not support split strategy, the task with not 0 offset will generate empty data + file_reader = std::make_unique(current_file); + return true; + } + if (!to_read_header.columns()) + { + auto total_rows = current_file->getTotalRows(); + if (total_rows) + { + file_reader = std::make_unique(current_file, context, output_header, *total_rows); + } + else + { + /// TODO: It may be a text format file that we do not have the stat metadata, e.g. total rows. + /// If we can get the file's schema, we can try to read all columns out. maybe consider make this + /// scan action fallback into spark + throw DB::Exception( + DB::ErrorCodes::LOGICAL_ERROR, + "All columns to read is partition columns, but this file({}) doesn't support this case.", + current_file->getURIPath()); + } + } + else + { + file_reader = std::make_unique(current_file, context, to_read_header, output_header); + } + + return true; +} + + +DB::ColumnPtr FileReaderWrapper::createConstColumn(DB::DataTypePtr data_type, const DB::Field & field, size_t rows) +{ + auto nested_type = DB::removeNullable(data_type); + auto column = nested_type->createColumnConst(rows, field); + + if (data_type->isNullable()) + column = DB::ColumnNullable::create(column, DB::ColumnUInt8::create(rows, 0)); + return column; +} + +#define BUILD_INT_FIELD(type) \ + [](DB::ReadBuffer & in, const String &) \ + {\ + type value = 0;\ + DB::readIntText(value, in);\ + return DB::Field(value);\ + } + +#define BUILD_FP_FIELD(type) \ + [](DB::ReadBuffer & in, const String &) \ + {\ + type value = 0.0;\ + DB::readFloatText(value, in);\ + return DB::Field(value);\ + } + +DB::Field FileReaderWrapper::buildFieldFromString(const String & str_value, DB::DataTypePtr type) +{ + using FieldBuilder = std::function; + static std::map field_builders + = {{magic_enum::enum_integer(DB::TypeIndex::Int8), BUILD_INT_FIELD(Int8) }, + {magic_enum::enum_integer(DB::TypeIndex::Int16), BUILD_INT_FIELD(Int16) }, + {magic_enum::enum_integer(DB::TypeIndex::Int32), BUILD_INT_FIELD(Int32) }, + {magic_enum::enum_integer(DB::TypeIndex::Int64), BUILD_INT_FIELD(Int64) }, + {magic_enum::enum_integer(DB::TypeIndex::Float32), BUILD_FP_FIELD(DB::Float32) }, + {magic_enum::enum_integer(DB::TypeIndex::Float64), BUILD_FP_FIELD(DB::Float64)}, + {magic_enum::enum_integer(DB::TypeIndex::String), [](DB::ReadBuffer &, const String & val) { return DB::Field(val); }}, + {magic_enum::enum_integer(DB::TypeIndex::Date), + [](DB::ReadBuffer & in, const String &) + { + DayNum value; + readDateText(value, in); + return DB::Field(value); + }}, + {magic_enum::enum_integer(DB::TypeIndex::Date32), + [](DB::ReadBuffer & in, const String &) + { + ExtendedDayNum value; + readDateText(value, in); + return DB::Field(value.toUnderType()); + }}}; + + auto nested_type = DB::removeNullable(type); + DB::ReadBufferFromString read_buffer(str_value); + auto it = field_builders.find(magic_enum::enum_integer(nested_type->getTypeId())); + if (it == field_builders.end()) + throw DB::Exception(DB::ErrorCodes::UNKNOWN_TYPE, "Unsupported data type {}", type->getFamilyName()); + return it->second(read_buffer, str_value); +} + +ConstColumnsFileReader::ConstColumnsFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & header_, size_t block_size_) + : FileReaderWrapper(file_) + , context(context_) + , header(header_) + , remained_rows(0) + , block_size(block_size_) +{ + auto rows = file->getTotalRows(); + if (!rows) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot get total rows number from file : {}", file->getURIPath()); + remained_rows = *rows; +} + +bool ConstColumnsFileReader::pull(DB::Chunk & chunk) +{ + if (!remained_rows) [[unlikely]] + return false; + size_t to_read_rows = 0; + if (remained_rows < block_size) + { + to_read_rows = remained_rows; + remained_rows = 0; + } + else + { + to_read_rows = block_size; + remained_rows -= block_size; + } + DB::Columns res_columns; + size_t columns_num = header.columns(); + res_columns.reserve(columns_num); + const auto & partition_values = file->getFilePartitionValues(); + for (size_t pos = 0; pos < columns_num; ++pos) + { + auto col_with_name_and_type = header.getByPosition(pos); + auto type = col_with_name_and_type.type; + const auto & name = col_with_name_and_type.name; + auto it = partition_values.find(name); + if (it == partition_values.end()) [[unlikely]] + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow partition column : {}", name); + } + auto field = buildFieldFromString(it->second, type); + auto column = createConstColumn(type, field, to_read_rows); + res_columns.emplace_back(column); + } + + chunk = DB::Chunk(std::move(res_columns), to_read_rows); + return true; +} + +NormalFileReader::NormalFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & to_read_header_, const DB::Block & output_header_) + : FileReaderWrapper(file_) + , context(context_) + , to_read_header(to_read_header_) + , output_header(output_header_) +{ + input_format = file->createInputFormat(to_read_header); + DB::Pipe pipe(input_format->input); + pipeline = std::make_unique(std::move(pipe)); + reader = std::make_unique(*pipeline); +} + + +bool NormalFileReader::pull(DB::Chunk & chunk) +{ + DB::Chunk tmp_chunk; + auto status = reader->pull(tmp_chunk); + if (!status) + { + return false; + } + + size_t rows = tmp_chunk.getNumRows(); + if (!rows) + return false; + + auto read_columns = tmp_chunk.detachColumns(); + DB::Columns res_columns; + auto columns_with_name_and_type = output_header.getColumnsWithTypeAndName(); + auto partition_values = file->getFilePartitionValues(); + + for (auto & column : columns_with_name_and_type) + { + if (to_read_header.has(column.name)) + { + auto pos = to_read_header.getPositionByName(column.name); + res_columns.push_back(read_columns[pos]); + } + else + { + auto it = partition_values.find(column.name); + if (it == partition_values.end()) + { + throw DB::Exception( + DB::ErrorCodes::LOGICAL_ERROR, "Not found column({}) from file({}) partition keys.", column.name, file->getURIPath()); + } + auto field = buildFieldFromString(it->second, column.type); + res_columns.push_back(createConstColumn(column.type, field, rows)); + } + } + chunk = DB::Chunk(std::move(res_columns), rows); + return true; +} +} diff --git a/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.h b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.h new file mode 100644 index 000000000000..99609b0dc292 --- /dev/null +++ b/utils/local-engine/Storages/SubstraitSource/SubstraitFileSource.h @@ -0,0 +1,96 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +namespace local_engine +{ + +class FileReaderWrapper +{ +public: + explicit FileReaderWrapper(FormatFilePtr file_) : file(file_) {} + virtual ~FileReaderWrapper() = default; + virtual bool pull(DB::Chunk & chunk) = 0; + +protected: + FormatFilePtr file; + + static DB::ColumnPtr createConstColumn(DB::DataTypePtr type, const DB::Field & field, size_t rows); + static DB::Field buildFieldFromString(const String & value, DB::DataTypePtr type); +}; + +class NormalFileReader : public FileReaderWrapper +{ +public: + NormalFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & to_read_header_, const DB::Block & output_header_); + ~NormalFileReader() override = default; + bool pull(DB::Chunk & chunk) override; + +private: + DB::ContextPtr context; + DB::Block to_read_header; + DB::Block output_header; + + FormatFile::InputFormatPtr input_format; + std::unique_ptr pipeline; + std::unique_ptr reader; +}; + +class EmptyFileReader : public FileReaderWrapper +{ +public: + explicit EmptyFileReader(FormatFilePtr file_) : FileReaderWrapper(file_) {} + ~EmptyFileReader() override = default; + bool pull(DB::Chunk &) override { return false; } +}; + +class ConstColumnsFileReader : public FileReaderWrapper +{ +public: + ConstColumnsFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & header_, size_t block_size_ = DEFAULT_BLOCK_SIZE); + ~ConstColumnsFileReader() override = default; + bool pull(DB::Chunk & chunk); +private: + DB::ContextPtr context; + DB::Block header; + size_t remained_rows; + size_t block_size; +}; + +class SubstraitFileSource : public DB::SourceWithProgress +{ +public: + SubstraitFileSource(DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos); + ~SubstraitFileSource() override = default; + + String getName() const override + { + return "SubstraitFileSource"; + } +protected: + DB::Chunk generate() override; +private: + DB::ContextPtr context; + DB::Block output_header; + DB::Block to_read_header; + FormatFiles files; + + UInt32 current_file_index = 0; + std::unique_ptr file_reader; + ReadBufferBuilderPtr read_buffer_builder; + + bool tryPrepareReader(); +}; +} diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index cc1c2d36619e..8a557b25e3bf 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -21,6 +21,7 @@ #include #include #include +#include bool inside_main = true; @@ -137,6 +138,7 @@ jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B"); local_engine::JNIUtils::vm = vm; + local_engine::registerReadBufferBuildes(local_engine::ReadBufferBuilderFactory::instance()); return JNI_VERSION_1_8; } diff --git a/utils/local-engine/tests/benchmark_local_engine.cpp b/utils/local-engine/tests/benchmark_local_engine.cpp index 9a4d066c3ecd..3674faa60b52 100644 --- a/utils/local-engine/tests/benchmark_local_engine.cpp +++ b/utils/local-engine/tests/benchmark_local_engine.cpp @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -34,6 +34,7 @@ #include #include #include +#include #include "testConfig.h" #if defined(__SSE2__) @@ -168,13 +169,16 @@ DB::ContextMutablePtr global_context; for (auto _ : state) { - auto files = std::make_shared(); - files->files = { - "file:///home/hongbin/code/gluten/jvm/src/test/resources/tpch-data/lineitem/" - "part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet", - }; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "file:///home/hongbin/code/gluten/jvm/src/test/resources/tpch-data/lineitem/" + "part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); auto builder = std::make_unique(); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); auto result = header.cloneEmpty(); diff --git a/utils/local-engine/tests/benchmark_parquet_read.cpp b/utils/local-engine/tests/benchmark_parquet_read.cpp index 47636166bb65..3d6bef2f084a 100644 --- a/utils/local-engine/tests/benchmark_parquet_read.cpp +++ b/utils/local-engine/tests/benchmark_parquet_read.cpp @@ -16,8 +16,9 @@ #include #include #include -#include +#include #include +#include static void BM_ParquetReadString(benchmark::State& state) { @@ -82,12 +83,15 @@ static void BM_OptimizedParquetReadString(benchmark::State& state) for (auto _ : state) { - auto files_info = std::make_shared(); - files_info->files = {file}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file_item = files.add_items(); + file_item->set_uri_file(file); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file_item->mutable_parquet()->CopyFrom(parquet_format); auto builder = std::make_unique(); - builder->init(Pipe(std::make_shared( - files_info, header, local_engine::SerializedPlanParser::global_context))); + builder->init( + Pipe(std::make_shared(local_engine::SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto reader = PullingPipelineExecutor(pipeline); while (reader.pull(res)) @@ -112,12 +116,15 @@ static void BM_OptimizedParquetReadDate32(benchmark::State& state) for (auto _ : state) { - auto files_info = std::make_shared(); - files_info->files = {file}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file_item = files.add_items(); + file_item->set_uri_file(file); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file_item->mutable_parquet()->CopyFrom(parquet_format); auto builder = std::make_unique(); - builder->init(Pipe(std::make_shared( - files_info, header, local_engine::SerializedPlanParser::global_context))); + builder->init( + Pipe(std::make_shared(local_engine::SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto reader = PullingPipelineExecutor(pipeline); while (reader.pull(res)) diff --git a/utils/local-engine/tests/gtest_ch_join.cpp b/utils/local-engine/tests/gtest_ch_join.cpp index 3fc258c3740b..28af2d08c000 100644 --- a/utils/local-engine/tests/gtest_ch_join.cpp +++ b/utils/local-engine/tests/gtest_ch_join.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include @@ -16,6 +16,7 @@ #include #include +#include using namespace DB; diff --git a/utils/local-engine/tests/gtest_ch_storages.cpp b/utils/local-engine/tests/gtest_ch_storages.cpp index 23804b26db33..8e3ce010843f 100644 --- a/utils/local-engine/tests/gtest_ch_storages.cpp +++ b/utils/local-engine/tests/gtest_ch_storages.cpp @@ -1,12 +1,13 @@ #include #include -#include +#include #include #include #include #include #include #include +#include using namespace DB; using namespace local_engine; @@ -24,8 +25,12 @@ TEST(TestBatchParquetFileSource, blob) "devstoreaccount1;"); auto builder = std::make_unique(); - auto files = std::make_shared(); - files->files = {"wasb://libch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "wasb://libch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); const auto * type_string = "columns format version: 1\n" "15 columns:\n" @@ -55,7 +60,7 @@ TEST(TestBatchParquetFileSource, blob) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -83,9 +88,12 @@ TEST(TestBatchParquetFileSource, s3) config->setString("s3.secret_access_key", "password"); auto builder = std::make_unique(); - auto files = std::make_shared(); - files->files = {"s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; - // files->files = {"file:///home/saber/Downloads/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); const auto * type_string = "columns format version: 1\n" "15 columns:\n" @@ -115,7 +123,7 @@ TEST(TestBatchParquetFileSource, s3) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -137,12 +145,18 @@ TEST(TestBatchParquetFileSource, s3) TEST(TestBatchParquetFileSource, local_file) { auto builder = std::make_unique(); - auto files = std::make_shared(); - files->files = { - "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet", - "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00001-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet", - "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00002-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet", - }; + + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + file->set_uri_file("file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); + file = files.add_items(); + file->set_uri_file("file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00001-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + file->mutable_parquet()->CopyFrom(parquet_format); + file = files.add_items(); + file->set_uri_file("file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00002-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + file->mutable_parquet()->CopyFrom(parquet_format); const auto * type_string = "columns format version: 1\n" "2 columns:\n" @@ -172,7 +186,7 @@ TEST(TestBatchParquetFileSource, local_file) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(files, header, SerializedPlanParser::global_context))); + builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -234,9 +248,12 @@ TEST(TestWrite, MergeTreeWriteTest) std::move(settings) ); - auto files_info = std::make_shared(); - files_info->files = {"s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"}; - auto source = std::make_shared(files_info, metadata->getSampleBlock(), SerializedPlanParser::global_context); + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + file->set_uri_file("s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); + auto source = std::make_shared(SerializedPlanParser::global_context, metadata->getSampleBlock(), files); QueryPipelineBuilder query_pipeline_builder; query_pipeline_builder.init(Pipe(source)); diff --git a/utils/local-engine/tests/gtest_local_engine.cpp b/utils/local-engine/tests/gtest_local_engine.cpp index d8e90f2feb59..49a4daf880f6 100644 --- a/utils/local-engine/tests/gtest_local_engine.cpp +++ b/utils/local-engine/tests/gtest_local_engine.cpp @@ -14,13 +14,14 @@ #include #include #include -#include +#include #include #include #include #include #include #include +#include #include "Storages/CustomStorageMergeTree.h" #include "testConfig.h" @@ -205,9 +206,14 @@ TEST(TestSelect, MergeTreeWriteTest) DB::StorageID("default", "test"), "test-intel/", *metadata, false, global_context, "", param, std::move(settings)); auto sink = std::make_shared(custom_merge_tree, metadata, global_context); - auto files_info = std::make_shared(); - files_info->files.push_back("/home/kyligence/Documents/test-dataset/intel-gazelle-test-150.snappy.parquet"); - auto source = std::make_shared(files_info, metadata->getSampleBlock(), SerializedPlanParser::global_context); + + substrait::ReadRel::LocalFiles files; + substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); + std::string file_path = "file:///home/kyligence/Documents/test-dataset/intel-gazelle-test-150.snappy.parquet"; + file->set_uri_file(file_path); + substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; + file->mutable_parquet()->CopyFrom(parquet_format); + auto source = std::make_shared(SerializedPlanParser::global_context, metadata->getSampleBlock(), files); QueryPipelineBuilder query_pipeline; query_pipeline.init(Pipe(source));