diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 954f11d54976..96b65c7248d3 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -30,12 +30,13 @@ #include #include #include -#include #include +#include #include #include #include #include +#include #include #include #include @@ -49,6 +50,7 @@ #include #include #include +#include #include #include #include @@ -58,25 +60,23 @@ #include #include #include -#include +#include #include #include -#include -#include #include #include #include "CHUtil.h" -#include #include +#include namespace DB { namespace ErrorCodes { - extern const int BAD_ARGUMENTS; +extern const int BAD_ARGUMENTS; } } @@ -304,7 +304,7 @@ size_t PODArrayUtil::adjustMemoryEfficientSize(size_t n) } else { - padded_n = rounded_n - padding_n; + padded_n = rounded_n - padding_n; } return padded_n; } @@ -326,9 +326,7 @@ std::string PlanUtil::explainPlan(DB::QueryPlan & plan) std::vector MergeTreeUtil::getAllMergeTreeParts(const Path & storage_path) { if (!fs::exists(storage_path)) - { throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree store path:{}", storage_path.string()); - } // TODO: May need to check the storage format version std::vector res; @@ -346,9 +344,7 @@ DB::NamesAndTypesList MergeTreeUtil::getSchemaFromMergeTreePart(const fs::path & { DB::NamesAndTypesList names_types_list; if (!fs::exists(part_path)) - { throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree store path:{}", part_path.string()); - } DB::ReadBufferFromFile readbuffer((part_path / "columns.txt").string()); names_types_list.readText(readbuffer); return names_types_list; @@ -388,9 +384,7 @@ std::optional NestedColumnExtractHelper::extractColum { auto table_iter = nested_tables.find(column_name_prefix); if (table_iter == nested_tables.end()) - { return {}; - } auto & nested_table = table_iter->second; auto nested_names = DB::Nested::splitName(column_name_suffix); @@ -412,9 +406,7 @@ std::optional NestedColumnExtractHelper::extractColum const auto * sub_col = findColumn(*nested_table, new_column_name_prefix); if (!sub_col) - { return {}; - } DB::ColumnsWithTypeAndName columns = {*sub_col}; DB::Block sub_block(columns); @@ -431,9 +423,7 @@ const DB::ColumnWithTypeAndName * NestedColumnExtractHelper::findColumn(const DB const auto & cols = in_block.getColumnsWithTypeAndName(); auto found = std::find_if(cols.begin(), cols.end(), [&](const auto & column) { return boost::iequals(column.name, name); }); if (found == cols.end()) - { return nullptr; - } return &*found; } @@ -476,9 +466,7 @@ std::map BackendInitializerUtil::getBackendConfMap(std { std::map ch_backend_conf; if (plan == nullptr) - { return ch_backend_conf; - } /// Parse backend configs from plan extensions do @@ -576,9 +564,9 @@ void BackendInitializerUtil::initLoggers(DB::Context::ConfigurationPtr config) { auto level = config->getString("logger.level", "warning"); if (config->has("logger.log")) - local_engine::Logger::initFileLogger(*config, "ClickHouseBackend"); + local_engine::LoggerExtend::initFileLogger(*config, "ClickHouseBackend"); else - local_engine::Logger::initConsoleLogger(level); + local_engine::LoggerExtend::initConsoleLogger(level); logger = &Poco::Logger::get("ClickHouseBackend"); } @@ -675,9 +663,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) /// Make sure global_context and shared_context are constructed only once. auto & shared_context = SerializedPlanParser::shared_context; if (!shared_context.get()) - { shared_context = SharedContextHolder(Context::createShared()); - } auto & global_context = SerializedPlanParser::global_context; if (!global_context) @@ -856,9 +842,7 @@ UInt64 MemoryUtil::getCurrentMemoryUsage(size_t depth) Int64 current_memory_usage = 0; auto * current_mem_tracker = DB::CurrentThread::getMemoryTracker(); for (size_t i = 0; i < depth && current_mem_tracker; ++i) - { current_mem_tracker = current_mem_tracker->getParent(); - } if (current_mem_tracker) current_memory_usage = current_mem_tracker->get(); return current_memory_usage < 0 ? 0 : current_memory_usage; @@ -867,12 +851,11 @@ UInt64 MemoryUtil::getCurrentMemoryUsage(size_t depth) UInt64 MemoryUtil::getMemoryRSS() { long rss = 0L; - FILE* fp = NULL; + FILE * fp = NULL; char buf[4096]; sprintf(buf, "/proc/%d/statm", getpid()); - if ((fp = fopen(buf, "r")) == NULL) { + if ((fp = fopen(buf, "r")) == NULL) return 0; - } fscanf(fp, "%*s%ld", &rss); fclose(fp); return rss * sysconf(_SC_PAGESIZE); diff --git a/cpp-ch/local-engine/Common/Logger.cpp b/cpp-ch/local-engine/Common/LoggerExtend.cpp similarity index 88% rename from cpp-ch/local-engine/Common/Logger.cpp rename to cpp-ch/local-engine/Common/LoggerExtend.cpp index e322f6ecbc8e..737113f55457 100644 --- a/cpp-ch/local-engine/Common/Logger.cpp +++ b/cpp-ch/local-engine/Common/LoggerExtend.cpp @@ -14,24 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "Logger.h" - +#include "LoggerExtend.h" #include #include #include #include -#include #include +#include #include - using Poco::AsyncChannel; using Poco::AutoPtr; using Poco::ConsoleChannel; -using Poco::PatternFormatter; using Poco::FormattingChannel; +using Poco::PatternFormatter; -void local_engine::Logger::initConsoleLogger(const std::string & level) +namespace local_engine +{ +void LoggerExtend::initConsoleLogger(const std::string & level) { AutoPtr chan(new ConsoleChannel); @@ -46,8 +46,9 @@ void local_engine::Logger::initConsoleLogger(const std::string & level) Poco::Logger::root().setLevel(level); } -void local_engine::Logger::initFileLogger(Poco::Util::AbstractConfiguration & config, const std::string & cmd_name) +void LoggerExtend::initFileLogger(Poco::Util::AbstractConfiguration & config, const std::string & cmd_name) { static Loggers loggers; loggers.buildLoggers(config, Poco::Logger::root(), cmd_name); } +} diff --git a/cpp-ch/local-engine/Common/Logger.h b/cpp-ch/local-engine/Common/LoggerExtend.h similarity index 96% rename from cpp-ch/local-engine/Common/Logger.h rename to cpp-ch/local-engine/Common/LoggerExtend.h index 16f77ddf0b15..5cb49cc18ab8 100644 --- a/cpp-ch/local-engine/Common/Logger.h +++ b/cpp-ch/local-engine/Common/LoggerExtend.h @@ -16,12 +16,11 @@ */ #pragma once -#include #include namespace local_engine { -class Logger +class LoggerExtend { public: static void initConsoleLogger(const std::string & level = "error"); diff --git a/cpp-ch/local-engine/examples/signal_demo.cpp b/cpp-ch/local-engine/examples/signal_demo.cpp index 85656ed60707..ffb20cf35293 100644 --- a/cpp-ch/local-engine/examples/signal_demo.cpp +++ b/cpp-ch/local-engine/examples/signal_demo.cpp @@ -15,29 +15,28 @@ * limitations under the License. */ +#include #include #include -#include +#include #include -#include -#include - using namespace DB; using namespace local_engine; - -int main(int /*argc*/, char * /*argv*/[]) +int main(int /*argc*/, char * /*argv*/[]) { - local_engine::Logger::initConsoleLogger("trace"); + local_engine::LoggerExtend::initConsoleLogger("trace"); Poco::Logger * logger = &Poco::Logger::get("signal_demo"); SignalHandler::instance().init(); - for (int j = 0; j < 10 ; j++) { + for (int j = 0; j < 10; j++) + { LOG_TRACE(logger, "counter {}", j); - if( j ){ - int *x = nullptr; + if (j) + { + int * x = nullptr; *x = 1; } sleepForSeconds(3); diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp index 9a88d114a9ef..0b64237800ca 100644 --- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp +++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp @@ -47,7 +47,6 @@ #include #include #include -#include #include #include #include @@ -66,74 +65,6 @@ using namespace dbms; DB::ContextMutablePtr global_context; -[[maybe_unused]] static void BM_MergeTreeRead(benchmark::State & state) -{ - std::shared_ptr metadata = std::make_shared(); - ColumnsDescription columns_description; - auto int64_type = std::make_shared(); - auto int32_type = std::make_shared(); - auto double_type = std::make_shared(); - - const auto * type_string = "columns format version: 1\n" - "3 columns:\n" - "`l_orderkey` Int64\n" - "`l_commitdate` Date\n" - "`l_receiptdate` Date\n" - "`l_shipinstruct` String\n" - "`l_shipmode` String\n" - "`l_comment` String\n"; - auto names_and_types_list = NamesAndTypesList::parse(type_string); - metadata = local_engine::buildMetaData(names_and_types_list, global_context); - auto param = DB::MergeTreeData::MergingParams(); - auto settings = local_engine::buildMergeTreeSettings(); - - local_engine::CustomStorageMergeTree custom_merge_tree( - DB::StorageID("default", "test"), - "data0/tpch100_zhichao/mergetree_nullable/lineitem", - *metadata, - false, - global_context, - "", - param, - std::move(settings)); - auto snapshot = std::make_shared(custom_merge_tree, metadata); - custom_merge_tree.loadDataParts(false, {}); - for (auto _ : state) - { - state.PauseTiming(); - auto query_info = local_engine::buildQueryInfo(names_and_types_list); - auto data_parts = custom_merge_tree.getDataPartsVectorForInternalUsage(); - int min_block = 0; - int max_block = state.range(0); - MergeTreeData::DataPartsVector selected_parts; - std::copy_if( - std::begin(data_parts), - std::end(data_parts), - std::inserter(selected_parts, std::begin(selected_parts)), - [min_block, max_block](MergeTreeData::DataPartPtr part) - { return part->info.min_block >= min_block && part->info.max_block <= max_block; }); - auto step = custom_merge_tree.reader.readFromParts( - selected_parts, {}, names_and_types_list.getNames(), snapshot, *query_info, global_context, 10000, 1); - - auto query_plan = QueryPlan(); - query_plan.addStep(std::move(step)); - - QueryPlanOptimizationSettings optimization_settings{.optimize_plan = false}; - auto query_pipeline = query_plan.buildQueryPipeline(optimization_settings, {}); - - state.ResumeTiming(); - auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_pipeline)); - auto executor = PullingPipelineExecutor(pipeline); - Chunk chunk; - int sum = 0; - while (executor.pull(chunk)) - { - sum += chunk.getNumRows(); - } - std::cerr << "rows:" << sum << std::endl; - } -} - [[maybe_unused]] static void BM_ParquetRead(benchmark::State & state) { const auto * type_string = "columns format version: 1\n" @@ -177,183 +108,6 @@ DB::ContextMutablePtr global_context; } } -[[maybe_unused]] static void BM_ShuffleSplitter(benchmark::State & state) -{ - std::shared_ptr metadata = std::make_shared(); - ColumnsDescription columns_description; - auto int64_type = std::make_shared(); - auto int32_type = std::make_shared(); - auto double_type = std::make_shared(); - const auto * type_string = "columns format version: 1\n" - "15 columns:\n" - "`l_partkey` Int64\n" - "`l_suppkey` Int64\n" - "`l_linenumber` Int32\n" - "`l_quantity` Float64\n" - "`l_extendedprice` Float64\n" - "`l_discount` Float64\n" - "`l_tax` Float64\n" - "`l_returnflag` String\n" - "`l_linestatus` String\n" - "`l_shipdate` Date\n" - "`l_commitdate` Date\n" - "`l_receiptdate` Date\n" - "`l_shipinstruct` String\n" - "`l_shipmode` String\n" - "`l_comment` String\n"; - auto names_and_types_list = NamesAndTypesList::parse(type_string); - metadata = local_engine::buildMetaData(names_and_types_list, global_context); - auto param = DB::MergeTreeData::MergingParams(); - auto settings = local_engine::buildMergeTreeSettings(); - - local_engine::CustomStorageMergeTree custom_merge_tree( - DB::StorageID("default", "test"), - "home/saber/Documents/data/mergetree", - *metadata, - false, - global_context, - "", - param, - std::move(settings)); - custom_merge_tree.loadDataParts(false, {}); - auto snapshot = std::make_shared(custom_merge_tree, metadata); - for (auto _ : state) - { - state.PauseTiming(); - auto query_info = local_engine::buildQueryInfo(names_and_types_list); - auto data_parts = custom_merge_tree.getDataPartsVectorForInternalUsage(); - int min_block = 0; - int max_block = state.range(0); - MergeTreeData::DataPartsVector selected_parts; - std::copy_if( - std::begin(data_parts), - std::end(data_parts), - std::inserter(selected_parts, std::begin(selected_parts)), - [min_block, max_block](MergeTreeData::DataPartPtr part) - { return part->info.min_block >= min_block && part->info.max_block <= max_block; }); - - auto step = custom_merge_tree.reader.readFromParts( - selected_parts, {}, names_and_types_list.getNames(), snapshot, *query_info, global_context, 10000, 1); - auto query_plan = QueryPlan(); - query_plan.addStep(std::move(step)); - - QueryPlanOptimizationSettings optimization_settings{.optimize_plan = false}; - auto query_pipeline = query_plan.buildQueryPipeline(optimization_settings, {}); - - auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_pipeline)); - state.ResumeTiming(); - auto executor = PullingPipelineExecutor(pipeline); - Block chunk = executor.getHeader(); - int sum = 0; - auto root = "/tmp/test_shuffle/" + local_engine::ShuffleSplitter::compress_methods[state.range(1)]; - local_engine::SplitOptions options{ - .split_size = 8192, - .io_buffer_size = DBMS_DEFAULT_BUFFER_SIZE, - .data_file = root + "/data.dat", - .map_id = 1, - .partition_num = 4, - .compress_method = local_engine::ShuffleSplitter::compress_methods[state.range(1)]}; - auto splitter = local_engine::ShuffleSplitter::create("rr", options); - while (executor.pull(chunk)) - { - sum += chunk.rows(); - splitter->split(chunk); - } - splitter->stop(); - splitter->writeIndexFile(); - std::cout << sum << "\n"; - } -} - -[[maybe_unused]] static void BM_HashShuffleSplitter(benchmark::State & state) -{ - std::shared_ptr metadata = std::make_shared(); - ColumnsDescription columns_description; - auto int64_type = std::make_shared(); - auto int32_type = std::make_shared(); - auto double_type = std::make_shared(); - const auto * type_string = "columns format version: 1\n" - "15 columns:\n" - "`l_partkey` Int64\n" - "`l_suppkey` Int64\n" - "`l_linenumber` Int32\n" - "`l_quantity` Float64\n" - "`l_extendedprice` Float64\n" - "`l_discount` Float64\n" - "`l_tax` Float64\n" - "`l_returnflag` String\n" - "`l_linestatus` String\n" - "`l_shipdate` Date\n" - "`l_commitdate` Date\n" - "`l_receiptdate` Date\n" - "`l_shipinstruct` String\n" - "`l_shipmode` String\n" - "`l_comment` String\n"; - auto names_and_types_list = NamesAndTypesList::parse(type_string); - metadata = local_engine::buildMetaData(names_and_types_list, global_context); - auto param = DB::MergeTreeData::MergingParams(); - auto settings = local_engine::buildMergeTreeSettings(); - - local_engine::CustomStorageMergeTree custom_merge_tree( - DB::StorageID("default", "test"), - "home/saber/Documents/data/mergetree", - *metadata, - false, - global_context, - "", - param, - std::move(settings)); - custom_merge_tree.loadDataParts(false, {}); - auto snapshot = std::make_shared(custom_merge_tree, metadata); - - for (auto _ : state) - { - state.PauseTiming(); - auto query_info = local_engine::buildQueryInfo(names_and_types_list); - auto data_parts = custom_merge_tree.getDataPartsVectorForInternalUsage(); - int min_block = 0; - int max_block = state.range(0); - MergeTreeData::DataPartsVector selected_parts; - std::copy_if( - std::begin(data_parts), - std::end(data_parts), - std::inserter(selected_parts, std::begin(selected_parts)), - [min_block, max_block](MergeTreeData::DataPartPtr part) - { return part->info.min_block >= min_block && part->info.max_block <= max_block; }); - - auto step = custom_merge_tree.reader.readFromParts( - selected_parts, {}, names_and_types_list.getNames(), snapshot, *query_info, global_context, 10000, 1); - auto query_plan = QueryPlan(); - query_plan.addStep(std::move(step)); - - QueryPlanOptimizationSettings optimization_settings{.optimize_plan = false}; - auto query_pipeline = query_plan.buildQueryPipeline(optimization_settings, {}); - - auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_pipeline)); - state.ResumeTiming(); - auto executor = PullingPipelineExecutor(pipeline); - Block chunk = executor.getHeader(); - int sum = 0; - auto root = "/tmp/test_shuffle/" + local_engine::ShuffleSplitter::compress_methods[state.range(1)]; - local_engine::SplitOptions options{ - .split_size = 8192, - .io_buffer_size = DBMS_DEFAULT_BUFFER_SIZE, - .data_file = root + "/data.dat", - .map_id = 1, - .partition_num = 4, - .compress_method = local_engine::ShuffleSplitter::compress_methods[state.range(1)]}; - auto splitter = local_engine::ShuffleSplitter::create("hash", options); - while (executor.pull(chunk)) - { - sum += chunk.rows(); - splitter->split(chunk); - } - splitter->stop(); - splitter->writeIndexFile(); - std::cout << sum << "\n"; - } -} - [[maybe_unused]] static void BM_ShuffleReader(benchmark::State & state) { for (auto _ : state) @@ -1062,85 +816,6 @@ class FasterCompressedReadBuffer : public FasterCompressedReadBufferBase, public #include -[[maybe_unused]] static void BM_CHColumnToSparkRowNew(benchmark::State & state) -{ - std::shared_ptr metadata = std::make_shared(); - ColumnsDescription columns_description; - auto int64_type = std::make_shared(); - auto int32_type = std::make_shared(); - auto double_type = std::make_shared(); - const auto * type_string = "columns format version: 1\n" - "15 columns:\n" - "`l_partkey` Int64\n" - "`l_suppkey` Int64\n" - "`l_linenumber` Int32\n" - "`l_quantity` Float64\n" - "`l_extendedprice` Float64\n" - "`l_discount` Float64\n" - "`l_tax` Float64\n" - "`l_returnflag` String\n" - "`l_linestatus` String\n" - "`l_shipdate` Date\n" - "`l_commitdate` Date\n" - "`l_receiptdate` Date\n" - "`l_shipinstruct` String\n" - "`l_shipmode` String\n" - "`l_comment` String\n"; - auto names_and_types_list = NamesAndTypesList::parse(type_string); - metadata = local_engine::buildMetaData(names_and_types_list, global_context); - auto param = DB::MergeTreeData::MergingParams(); - auto settings = local_engine::buildMergeTreeSettings(); - - local_engine::CustomStorageMergeTree custom_merge_tree( - DB::StorageID("default", "test"), - "data1/tpc_data/tpch10_liuneng/mergetree/lineitem", - *metadata, - false, - global_context, - "", - param, - std::move(settings)); - auto snapshot = std::make_shared(custom_merge_tree, metadata); - custom_merge_tree.loadDataParts(false, {}); - for (auto _ : state) - { - state.PauseTiming(); - auto query_info = local_engine::buildQueryInfo(names_and_types_list); - auto data_parts = custom_merge_tree.getDataPartsVectorForInternalUsage(); - int min_block = 0; - int max_block = 10; - MergeTreeData::DataPartsVector selected_parts; - std::copy_if( - std::begin(data_parts), - std::end(data_parts), - std::inserter(selected_parts, std::begin(selected_parts)), - [min_block, max_block](MergeTreeData::DataPartPtr part) - { return part->info.min_block >= min_block && part->info.max_block <= max_block; }); - - auto step = custom_merge_tree.reader.readFromParts( - selected_parts, {}, names_and_types_list.getNames(), snapshot, *query_info, global_context, 10000, 1); - auto query_plan = QueryPlan(); - query_plan.addStep(std::move(step)); - - QueryPlanOptimizationSettings optimization_settings{.optimize_plan = false}; - auto query_pipeline = query_plan.buildQueryPipeline(optimization_settings, {}); - - state.ResumeTiming(); - auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_pipeline)); - auto executor = PullingPipelineExecutor(pipeline); - Block header = executor.getHeader(); - CHColumnToSparkRow converter; - int sum = 0; - while (executor.pull(header)) - { - sum += header.rows(); - auto spark_row = converter.convertCHColumnToSparkRow(header); - converter.freeMem(spark_row->getBufferAddress(), spark_row->getTotalBytes()); - } - std::cerr << "rows: " << sum << std::endl; - } -} - struct MergeTreeWithSnapshot { std::shared_ptr merge_tree; @@ -1148,18 +823,6 @@ struct MergeTreeWithSnapshot NamesAndTypesList columns; }; -MergeTreeWithSnapshot buildMergeTree(NamesAndTypesList names_and_types, std::string relative_path, std::string table) -{ - auto metadata = local_engine::buildMetaData(names_and_types, global_context); - auto param = DB::MergeTreeData::MergingParams(); - auto settings = local_engine::buildMergeTreeSettings(); - std::shared_ptr custom_merge_tree = std::make_shared( - DB::StorageID("default", table), relative_path, *metadata, false, global_context, "", param, std::move(settings)); - auto snapshot = std::make_shared(*custom_merge_tree, metadata); - custom_merge_tree->loadDataParts(false, {}); - return MergeTreeWithSnapshot{.merge_tree = custom_merge_tree, .snapshot = snapshot, .columns = names_and_types}; -} - QueryPlanPtr readFromMergeTree(MergeTreeWithSnapshot storage) { auto query_info = local_engine::buildQueryInfo(storage.columns); @@ -1221,61 +884,6 @@ QueryPlanPtr joinPlan(QueryPlanPtr left, QueryPlanPtr right, String left_key, St return query_plan; } -[[maybe_unused]] static void BM_JoinTest(benchmark::State & state) -{ - std::shared_ptr metadata = std::make_shared(); - ColumnsDescription columns_description; - auto int64_type = std::make_shared(); - auto int32_type = std::make_shared(); - auto double_type = std::make_shared(); - const auto * supplier_type_string = "columns format version: 1\n" - "2 columns:\n" - "`s_suppkey` Int64\n" - "`s_nationkey` Int64\n"; - auto supplier_types = NamesAndTypesList::parse(supplier_type_string); - auto supplier = buildMergeTree(supplier_types, "home/saber/Documents/data/tpch/mergetree/supplier", "supplier"); - - const auto * nation_type_string = "columns format version: 1\n" - "1 columns:\n" - "`n_nationkey` Int64\n"; - auto nation_types = NamesAndTypesList::parse(nation_type_string); - auto nation = buildMergeTree(nation_types, "home/saber/Documents/data/tpch/mergetree/nation", "nation"); - - - const auto * partsupp_type_string = "columns format version: 1\n" - "3 columns:\n" - "`ps_suppkey` Int64\n" - "`ps_availqty` Int64\n" - "`ps_supplycost` Float64\n"; - auto partsupp_types = NamesAndTypesList::parse(partsupp_type_string); - auto partsupp = buildMergeTree(partsupp_types, "home/saber/Documents/data/tpch/mergetree/partsupp", "partsupp"); - - for (auto _ : state) - { - state.PauseTiming(); - QueryPlanPtr supplier_query; - { - auto left = readFromMergeTree(partsupp); - auto right = readFromMergeTree(supplier); - supplier_query = joinPlan(std::move(left), std::move(right), "ps_suppkey", "s_suppkey"); - } - auto right = readFromMergeTree(nation); - auto query_plan = joinPlan(std::move(supplier_query), std::move(right), "s_nationkey", "n_nationkey"); - QueryPlanOptimizationSettings optimization_settings{.optimize_plan = false}; - BuildQueryPipelineSettings pipeline_settings; - auto pipeline_builder = query_plan->buildQueryPipeline(optimization_settings, pipeline_settings); - state.ResumeTiming(); - auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder)); - auto executor = PullingPipelineExecutor(pipeline); - Block header = executor.getHeader(); - [[maybe_unused]] int sum = 0; - while (executor.pull(header)) - { - sum += header.rows(); - } - } -} - BENCHMARK(BM_ParquetRead)->Unit(benchmark::kMillisecond)->Iterations(10); // BENCHMARK(BM_TestDecompress)->Arg(0)->Arg(1)->Arg(2)->Arg(3)->Unit(benchmark::kMillisecond)->Iterations(50)->Repetitions(6)->ComputeStatistics("80%", quantile); diff --git a/cpp-ch/local-engine/tests/gtest_ch_join.cpp b/cpp-ch/local-engine/tests/gtest_ch_join.cpp index 3e8e599c9eb3..1621a9e8868d 100644 --- a/cpp-ch/local-engine/tests/gtest_ch_join.cpp +++ b/cpp-ch/local-engine/tests/gtest_ch_join.cpp @@ -148,97 +148,4 @@ TEST(TestJoin, simple) auto res = pipeline->getHeader().cloneEmpty(); executor.pull(res); debug::headBlock(res); -} - - -TEST(TestJoin, StorageJoinFromReadBufferTest) -{ - auto global_context = SerializedPlanParser::global_context; - auto & factory = DB::FunctionFactory::instance(); - auto function = factory.get("murmurHash2_64", local_engine::SerializedPlanParser::global_context); - auto int_type = DataTypeFactory::instance().get("Int32"); - auto column0 = int_type->createColumn(); - column0->insert(1); - column0->insert(2); - column0->insert(3); - column0->insert(4); - - auto column1 = int_type->createColumn(); - column1->insert(2); - column1->insert(4); - column1->insert(6); - column1->insert(8); - - ColumnsWithTypeAndName columns - = {ColumnWithTypeAndName(std::move(column0), int_type, "colA"), ColumnWithTypeAndName(std::move(column1), int_type, "colB")}; - Block left(columns); - - auto column3 = int_type->createColumn(); - column3->insert(1); - column3->insert(2); - column3->insert(3); - column3->insert(5); - - auto column4 = int_type->createColumn(); - column4->insert(1); - column4->insert(3); - column4->insert(5); - column4->insert(9); - - ColumnsWithTypeAndName columns2 - = {ColumnWithTypeAndName(std::move(column3), int_type, "colD"), ColumnWithTypeAndName(std::move(column4), int_type, "colC")}; - Block right(columns2); - std::string buf; - WriteBufferFromString write_buf(buf); - NativeWriter writer(write_buf, 0, right.cloneEmpty()); - writer.write(right); - - auto in = std::make_unique(buf); - auto metadata = local_engine::buildMetaData(right.getNamesAndTypesList(), global_context); - Names cols = {"colD"}; - auto table_join = std::make_shared(SizeLimits(), false, JoinKind::Left, JoinStrictness::All, cols); - auto join_storage = std::shared_ptr(new StorageJoinFromReadBuffer( // NOLINT - *in, - 2048, // Even if you don't know the number of rows, passing an arbitrary value is fine - cols, - false, - table_join, - ColumnsDescription(right.getNamesAndTypesList()), - {}, - "test", - true)); - - auto left_table = std::make_shared(left); - SelectQueryInfo query_info; - - QueryPlan left_plan; - left_plan.addStep(std::make_unique(Pipe(left_table))); - - auto join = std::make_shared(SizeLimits(), false, JoinKind::Left, JoinStrictness::All, right.getNames()); - auto required_rkey = NameAndTypePair("colD", int_type); - join->addJoinedColumn(required_rkey); - join->addJoinedColumn(NameAndTypePair("colC", int_type)); - ASTPtr lkey = std::make_shared("colA"); - ASTPtr rkey = std::make_shared("colD"); - join->addOnKeys(lkey, rkey, false); - - - auto hash_join = join_storage->getJoinLocked(join, global_context); - - QueryPlanStepPtr join_step = std::make_unique(left_plan.getCurrentDataStream(), hash_join, 8192); - - join_step->setStepDescription("JOIN"); - left_plan.addStep(std::move(join_step)); - - ActionsDAGPtr project = std::make_shared(left_plan.getCurrentDataStream().header.getNamesAndTypesList()); - project->project( - {NameWithAlias("colA", "colA"), NameWithAlias("colB", "colB"), NameWithAlias("colD", "colD"), NameWithAlias("colC", "colC")}); - QueryPlanStepPtr project_step = std::make_unique(left_plan.getCurrentDataStream(), project); - left_plan.addStep(std::move(project_step)); - auto pipeline = left_plan.buildQueryPipeline(QueryPlanOptimizationSettings(), BuildQueryPipelineSettings()); - auto executable_pipe = QueryPipelineBuilder::getPipeline(std::move(*pipeline)); - PullingPipelineExecutor executor(executable_pipe); - auto res = pipeline->getHeader().cloneEmpty(); - executor.pull(res); - debug::headBlock(res); -} +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp index 1aea7a4ecbd1..0e6362f704dc 100644 --- a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp +++ b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp @@ -230,65 +230,6 @@ TEST(TestBatchParquetFileSource, local_file) ASSERT_TRUE(total_rows == 59986052); } -TEST(TestWrite, MergeTreeWriteTest) -{ - GTEST_SKIP(); - auto config = local_engine::SerializedPlanParser::config; - config->setString("s3.endpoint", "http://localhost:9000/tpch/"); - config->setString("s3.region", "us-east-1"); - config->setString("s3.access_key_id", "admin"); - config->setString("s3.secret_access_key", "password"); - auto global_context = local_engine::SerializedPlanParser::global_context; - - auto param = DB::MergeTreeData::MergingParams(); - auto settings = std::make_unique(); - settings->set("min_bytes_for_wide_part", Field(0)); - settings->set("min_rows_for_wide_part", Field(0)); - - const auto * type_string = "columns format version: 1\n" - "15 columns:\n" - "`l_partkey` Int64\n" - "`l_suppkey` Int64\n" - "`l_linenumber` Int32\n" - "`l_quantity` Float64\n" - "`l_extendedprice` Float64\n" - "`l_discount` Float64\n" - "`l_tax` Float64\n" - "`l_returnflag` String\n" - "`l_linestatus` String\n" - "`l_shipdate` Date\n" - "`l_commitdate` Date\n" - "`l_receiptdate` Date\n" - "`l_shipinstruct` String\n" - "`l_shipmode` String\n" - "`l_comment` String\n"; - auto names_and_types_list = NamesAndTypesList::parse(type_string); - auto metadata = local_engine::buildMetaData(names_and_types_list, global_context); - - local_engine::CustomStorageMergeTree custom_merge_tree( - DB::StorageID("default", "test"), "tmp/test-write/", *metadata, false, global_context, "", param, std::move(settings)); - - substrait::ReadRel::LocalFiles files; - substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items(); - file->set_uri_file("s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet"); - substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; - file->mutable_parquet()->CopyFrom(parquet_format); - auto source = std::make_shared(SerializedPlanParser::global_context, metadata->getSampleBlock(), files); - - QueryPipelineBuilder query_pipeline_builder; - query_pipeline_builder.init(Pipe(source)); - query_pipeline_builder.setSinks( - [&](const Block &, Pipe::StreamType type) -> ProcessorPtr - { - if (type != Pipe::StreamType::Main) - return nullptr; - - return std::make_shared(custom_merge_tree, metadata, global_context); - }); - auto executor = query_pipeline_builder.execute(); - executor->execute(1, false); -} - TEST(TestPrewhere, OptimizePrewhereCondition) { String filter(R"({"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}}, "arguments":[{"value":{"scalarFunction":{"functionReference":1,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}, {"value":{"literal":{"date":8766}}}]}}},{"value":{"scalarFunction":{"functionReference":2,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}}, "arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}},{"value":{"literal":{"date":9131}}}]}}}]}}}, {"value":{"scalarFunction":{"functionReference":3,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection": {"directReference":{"structField":{}}}}},{"value":{"literal":{"decimal":{"value":"YAkAAAAAAAAAAAAAAAAAAA==","precision":15,"scale":2}}}}]}}}]}}}, {"value":{"scalarFunction":{"functionReference":4,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection": {"directReference":{"structField":{"field":1}}}}},{"value":{"literal":{"decimal":{"value":"BQAAAAAAAAAAAAAAAAAAAA==","precision":15,"scale":2}}}}]}}}]}}},{"value": {"scalarFunction":{"functionReference":5,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField": {"field":1}}}}},{"value":{"literal":{"decimal":{"value":"BwAAAAAAAAAAAAAAAAAAAA==","precision":15,"scale":2}}}}]}}}]}})"); diff --git a/cpp-ch/local-engine/tests/gtest_local_engine.cpp b/cpp-ch/local-engine/tests/gtest_local_engine.cpp index 9082041d19db..427cf6d3e505 100644 --- a/cpp-ch/local-engine/tests/gtest_local_engine.cpp +++ b/cpp-ch/local-engine/tests/gtest_local_engine.cpp @@ -42,7 +42,6 @@ #include #include #include -#include #include "testConfig.h" using namespace local_engine;