diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp index 839f3ea529c1a..9a8276c0ad356 100644 --- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp +++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp @@ -35,8 +35,9 @@ #include #include #include -#include +#include #include +#include #include #include #include @@ -45,7 +46,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2.json b/cpp-ch/local-engine/tests/decimal_filter_push_down/18_2.json similarity index 100% rename from cpp-ch/local-engine/tests/decmial_filter_push_down/18_2.json rename to cpp-ch/local-engine/tests/decimal_filter_push_down/18_2.json diff --git a/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet b/cpp-ch/local-engine/tests/decimal_filter_push_down/18_2_flba.snappy.parquet similarity index 100% rename from cpp-ch/local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet rename to cpp-ch/local-engine/tests/decimal_filter_push_down/18_2_flba.snappy.parquet diff --git a/cpp-ch/local-engine/tests/gluten_test_util.cpp b/cpp-ch/local-engine/tests/gluten_test_util.cpp index 2d558ebe47445..7dbc7206dbb75 100644 --- a/cpp-ch/local-engine/tests/gluten_test_util.cpp +++ b/cpp-ch/local-engine/tests/gluten_test_util.cpp @@ -21,13 +21,14 @@ #include #include #include - #include #include +#include #include #include #include #include +#include #include #include @@ -73,6 +74,16 @@ std::optional parseFilter(const std::string & filter, const AnotherR return ActionsDAG::buildFilterActionsDAG({visitor_data.getActions().getOutputs().back()}, node_name_to_input_column); } +std::pair> create_plan_and_executor( + std::string_view json_plan, std::string_view split_template, std::string_view file, const std::optional & context) +{ + const std::string split = replaceLocalFilesWildcards(split_template, file); + SerializedPlanParser parser(context.value_or(SerializedPlanParser::global_context)); + parser.addSplitInfo(local_engine::JsonStringToBinary(split)); + const auto plan = local_engine::JsonStringToMessage(json_plan); + return {plan, parser.createExecutor(plan)}; +} + const char * get_data_dir() { const auto * const result = std::getenv("PARQUET_TEST_DATA"); diff --git a/cpp-ch/local-engine/tests/gluten_test_util.h b/cpp-ch/local-engine/tests/gluten_test_util.h index 996b27bf884de..9f7380cf5446a 100644 --- a/cpp-ch/local-engine/tests/gluten_test_util.h +++ b/cpp-ch/local-engine/tests/gluten_test_util.h @@ -21,13 +21,16 @@ #include #include #include -#include -#include -#include + #include +#include #include #include +namespace local_engine +{ +class LocalExecutor; +} using BlockRowType = DB::ColumnsWithTypeAndName; using BlockFieldType = DB::ColumnWithTypeAndName; using AnotherRowType = DB::NamesAndTypesList; @@ -65,12 +68,18 @@ AnotherRowType readParquetSchema(const std::string & file); std::optional parseFilter(const std::string & filter, const AnotherRowType & name_and_types); +std::pair> create_plan_and_executor( + std::string_view json_plan, + std::string_view split_template, + std::string_view file, + const std::optional & context = std::nullopt); + } -inline std::string replaceLocalFilesWildcards(const String & haystack, const String & replaced) +inline std::string replaceLocalFilesWildcards(const std::string_view haystack, const std::string_view replaced) { - static constexpr auto _WILDCARD_ = "{replace_local_files}"; - return boost::replace_all_copy(haystack, _WILDCARD_, replaced); + static constexpr auto wildcard = "{replace_local_files}"; + return boost::replace_all_copy(std::string{haystack}, wildcard, replaced); } inline BlockFieldType toBlockFieldType(const AnotherFieldType & type) @@ -122,4 +131,10 @@ inline parquet::ByteArray ByteArrayFromString(const std::string & s) { const auto * const ptr = reinterpret_cast(s.data()); return parquet::ByteArray(static_cast(s.size()), ptr); -} \ No newline at end of file +} + +#define EMBEDDED_PLAN(res) \ + std::string_view \ + { \ + reinterpret_cast(g##res##Data), g##res##Size \ + } \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/gtest_ch_join.cpp b/cpp-ch/local-engine/tests/gtest_ch_join.cpp index e130d82b9c7c4..f41a9a470a526 100644 --- a/cpp-ch/local-engine/tests/gtest_ch_join.cpp +++ b/cpp-ch/local-engine/tests/gtest_ch_join.cpp @@ -26,10 +26,10 @@ #include #include +#include #include #include #include -#include #include #include diff --git a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp index 7de21a26df72f..ddec8897474c0 100644 --- a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp +++ b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp @@ -20,13 +20,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include using namespace DB; using namespace local_engine; diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp index 5b5797ed7d21d..1e2525b33a5b8 100644 --- a/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp +++ b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp @@ -28,8 +28,7 @@ using namespace local_engine; using namespace DB; // Plan for https://github.com/ClickHouse/ClickHouse/pull/54881 -INCBIN(resource_embedded_pr_54881_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_54881.json"); - +INCBIN(_pr_54881_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_54881.json"); TEST(Clickhouse, PR54881) { const auto context1 = DB::Context::createCopy(SerializedPlanParser::global_context); @@ -37,18 +36,10 @@ TEST(Clickhouse, PR54881) auto settings = context1->getSettingsRef(); EXPECT_FALSE(settings.enable_named_columns_in_function_tuple) << "GLUTEN NEED set enable_named_columns_in_function_tuple to false"; - const std::string split_template + constexpr std::string_view split_template = R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"1529","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; - const std::string split - = replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/54881.snappy.parquet")); - - SerializedPlanParser parser(context1); - parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - - const auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_pr_54881_jsonData), gresource_embedded_pr_54881_jsonSize}); - - auto local_executor = parser.createExecutor(plan); + constexpr std::string_view file{GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/54881.snappy.parquet")}; + auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_pr_54881_), split_template, file, context1); EXPECT_TRUE(local_executor->hasNext()); const Block & block = *local_executor->nextColumnar(); @@ -86,53 +77,36 @@ TEST(Clickhouse, PR54881) } // Plan for https://github.com/ClickHouse/ClickHouse/pull/65234 -INCBIN(resource_embedded_pr_65234_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_65234.json"); - +INCBIN(_pr_65234_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_65234.json"); TEST(Clickhouse, PR65234) { const std::string split = R"({"items":[{"uriFile":"file:///foo","length":"84633","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; SerializedPlanParser parser(SerializedPlanParser::global_context); parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - const auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_pr_65234_jsonData), gresource_embedded_pr_65234_jsonSize}); + const auto plan = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_pr_65234_)); auto query_plan = parser.parse(plan); } -INCBIN(resource_embedded_pr_68135_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68135.json"); +INCBIN(_pr_68135_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68135.json"); TEST(Clickhouse, PR68135) { - const std::string split_template + constexpr std::string_view split_template = R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"461","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; - const std::string split - = replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68135.snappy.parquet")); + constexpr std::string_view file{GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68135.snappy.parquet")}; + auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_pr_68135_), split_template, file); - SerializedPlanParser parser(SerializedPlanParser::global_context); - parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - - const auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_pr_68135_jsonData), gresource_embedded_pr_68135_jsonSize}); - - auto local_executor = parser.createExecutor(plan); EXPECT_TRUE(local_executor->hasNext()); const Block & x = *local_executor->nextColumnar(); debug::headBlock(x); } -INCBIN(resource_embedded_pr_68131_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68131.json"); +INCBIN(_pr_68131_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68131.json"); TEST(Clickhouse, PR68131) { - const std::string split_template + constexpr std::string_view split_template = R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"289","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; - const std::string split - = replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68131.parquet")); - - SerializedPlanParser parser(SerializedPlanParser::global_context); - parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - - const auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_pr_68131_jsonData), gresource_embedded_pr_68131_jsonSize}); - - auto local_executor = parser.createExecutor(plan); + auto [_, local_executor] = test::create_plan_and_executor( + EMBEDDED_PLAN(_pr_68131_), split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68131.parquet")); EXPECT_TRUE(local_executor->hasNext()); const Block & x = *local_executor->nextColumnar(); debug::headBlock(x); diff --git a/cpp-ch/local-engine/tests/gtest_local_engine.cpp b/cpp-ch/local-engine/tests/gtest_local_engine.cpp index 86cea39e5eb44..a42371c9547cf 100644 --- a/cpp-ch/local-engine/tests/gtest_local_engine.cpp +++ b/cpp-ch/local-engine/tests/gtest_local_engine.cpp @@ -29,7 +29,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp index ee6e70305b27b..4436bf0cd0acf 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp @@ -28,27 +28,19 @@ using namespace local_engine; using namespace DB; -INCBIN(resource_embedded_pr_18_2_json, SOURCE_DIR "/utils/extern-local-engine/tests/decmial_filter_push_down/18_2.json"); -TEST(ColumnIndex, Deciaml182) +INCBIN(_pr_18_2, SOURCE_DIR "/utils/extern-local-engine/tests/decimal_filter_push_down/18_2.json"); +TEST(ColumnIndex, Decimal182) { // [precision,scale] = [18,2] const auto context1 = DB::Context::createCopy(SerializedPlanParser::global_context); - - auto config = ExecutorConfig::loadFromContext(context1); + const auto config = ExecutorConfig::loadFromContext(context1); EXPECT_TRUE(config.use_local_format) << "gtest need set use_local_format to true"; - const std::string split_template + constexpr std::string_view split_template = R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"488","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; - const std::string split = replaceLocalFilesWildcards( - split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet")); - - SerializedPlanParser parser(context1); - parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - - const auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_pr_18_2_jsonData), gresource_embedded_pr_18_2_jsonSize}); + constexpr std::string_view file{GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/decimal_filter_push_down/18_2_flba.snappy.parquet")}; + auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_pr_18_2), split_template, file, context1); - auto local_executor = parser.createExecutor(plan); EXPECT_TRUE(local_executor->hasNext()); const Block & x = *local_executor->nextColumnar(); debug::headBlock(x); diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp b/cpp-ch/local-engine/tests/gtest_parser.cpp index 135f81a9149e7..bf52bd54ccee0 100644 --- a/cpp-ch/local-engine/tests/gtest_parser.cpp +++ b/cpp-ch/local-engine/tests/gtest_parser.cpp @@ -32,21 +32,21 @@ using namespace local_engine; using namespace DB; -INCBIN(resource_embedded_readcsv_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/read_student_option_schema.csv.json"); +INCBIN(_readcsv_plan, SOURCE_DIR "/utils/extern-local-engine/tests/json/read_student_option_schema.csv.json"); TEST(LocalExecutor, ReadCSV) { - const std::string split_template + constexpr std::string_view split_template = R"({"items":[{"uriFile":"{replace_local_files}","length":"56","text":{"fieldDelimiter":",","maxBlockSize":"8192","header":"1"},"schema":{"names":["id","name","language"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; const std::string split = replaceLocalFilesWildcards( split_template, GLUTEN_SOURCE_DIR("/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv")); SerializedPlanParser parser(SerializedPlanParser::global_context); parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_readcsv_jsonData), gresource_embedded_readcsv_jsonSize}); + auto plan = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_readcsv_plan)); auto query_plan = parser.parse(plan); const auto pipeline = parser.buildQueryPipeline(*query_plan); LocalExecutor local_executor{std::move(query_plan), QueryPipelineBuilder::getPipeline(std::move(*pipeline))}; + EXPECT_TRUE(local_executor.hasNext()); const Block & x = *local_executor.nextColumnar(); EXPECT_EQ(4, x.rows()); @@ -56,12 +56,10 @@ size_t count(const substrait::Type_Struct & type) { size_t ret = 0; for (const auto & t : type.types()) - { if (t.has_struct_()) ret += 1 + count(t.struct_()); else ret++; - } return ret; } diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp index e56184a1417cf..1771760e9661e 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -20,13 +20,21 @@ #include #include #include +#include +#include +#include #include #include #include #include #include #include +#include #include +#include +#include +#include +#include #include #include #include @@ -103,24 +111,19 @@ TEST(LocalExecutor, StorageObjectStorageSink) } -INCBIN(resource_embedded_write_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_plan.json"); +INCBIN(native_write, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_plan.json"); TEST(WritePipeline, SubstraitFileSink) { + const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); const auto tmpdir = std::string{"file:///tmp/test_table/test"}; const auto filename = std::string{"data.parquet"}; - const std::string split_template - = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; - const std::string split - = replaceLocalFilesWildcards(split_template, GLUTEN_SOURCE_DIR("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")); - - const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); context->setSetting(local_engine::SPARK_TASK_WRITE_TMEP_DIR, tmpdir); context->setSetting(local_engine::SPARK_TASK_WRITE_FILENAME, filename); - SerializedPlanParser parser(context); - parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - const auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_write_jsonData), gresource_embedded_write_jsonSize}); + constexpr std::string_view split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; + constexpr std::string_view file{GLUTEN_SOURCE_DIR("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")}; + auto [plan, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(native_write), split_template, file, context); EXPECT_EQ(1, plan.relations_size()); const substrait::PlanRel & root_rel = plan.relations().at(0); @@ -139,7 +142,6 @@ TEST(WritePipeline, SubstraitFileSink) EXPECT_EQ("parquet", config["format"]); EXPECT_EQ("1", config["isSnappy"]); - EXPECT_TRUE(write_rel.has_table_schema()); const substrait::NamedStruct & table_schema = write_rel.table_schema(); auto block = TypeParser::buildBlockFromNamedStruct(table_schema); @@ -151,8 +153,6 @@ TEST(WritePipeline, SubstraitFileSink) DB::Names expected_partition_cols; EXPECT_EQ(expected_partition_cols, partitionCols); - - auto local_executor = parser.createExecutor(plan); EXPECT_TRUE(local_executor->hasNext()); const Block & x = *local_executor->nextColumnar(); debug::headBlock(x); @@ -165,23 +165,18 @@ TEST(WritePipeline, SubstraitFileSink) EXPECT_EQ(10000, col_c.getInt(0)); } -INCBIN(resource_embedded_write_one_partition_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_one_partition.json"); +INCBIN(native_write_one_partition, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_one_partition.json"); TEST(WritePipeline, SubstraitPartitionedFileSink) { - const std::string split_template - = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; - const std::string split - = replaceLocalFilesWildcards(split_template, GLUTEN_SOURCE_DIR("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")); - const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); context->setSetting(local_engine::SPARK_TASK_WRITE_TMEP_DIR, std::string{"file:///tmp/test_table/test_partition"}); context->setSetting(local_engine::SPARK_TASK_WRITE_FILENAME, std::string{"data.parquet"}); - SerializedPlanParser parser(context); - parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - const auto plan = local_engine::JsonStringToMessage( - {reinterpret_cast(gresource_embedded_write_one_partition_jsonData), gresource_embedded_write_one_partition_jsonSize}); + constexpr std::string_view split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; + constexpr std::string_view file{GLUTEN_SOURCE_DIR("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")}; + auto [plan, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(native_write_one_partition), split_template, file, context); EXPECT_EQ(1, plan.relations_size()); const substrait::PlanRel & root_rel = plan.relations().at(0); @@ -212,7 +207,6 @@ TEST(WritePipeline, SubstraitPartitionedFileSink) DB::Names expected_partition_cols{"s_nationkey"}; EXPECT_EQ(expected_partition_cols, partitionCols); - auto local_executor = parser.createExecutor(plan); EXPECT_TRUE(local_executor->hasNext()); const Block & x = *local_executor->nextColumnar(); debug::headBlock(x, 25); @@ -250,4 +244,193 @@ TEST(WritePipeline, ComputePartitionedExpression) EXPECT_EQ("s_nationkey=1/name=one", partition_by_result_column->getDataAt(0)); EXPECT_EQ("s_nationkey=2/name=two", partition_by_result_column->getDataAt(1)); EXPECT_EQ("s_nationkey=3/name=three", partition_by_result_column->getDataAt(2)); +} + +void do_remove(const std::string & folder) +{ + namespace fs = std::filesystem; + if (const std::filesystem::path ph(folder); fs::exists(ph)) + fs::remove_all(ph); +} + +Chunk person_chunk() +{ + auto id = INT()->createColumn(); + id->insert(100); + id->insert(200); + id->insert(300); + id->insert(400); + id->insert(500); + id->insert(600); + id->insert(700); + + auto name = STRING()->createColumn(); + name->insert("Joe"); + name->insert("Marry"); + name->insert("Mike"); + name->insert("Fred"); + name->insert("Albert"); + name->insert("Michelle"); + name->insert("Dan"); + + auto age = makeNullable(INT())->createColumn(); + Field null_field; + age->insert(30); + age->insert(null_field); + age->insert(18); + age->insert(50); + age->insert(null_field); + age->insert(30); + age->insert(50); + + + MutableColumns x; + x.push_back(std::move(id)); + x.push_back(std::move(name)); + x.push_back(std::move(age)); + return {std::move(x), 7}; +} + +TEST(WritePipeline, MergeTree) +{ + ThreadStatus thread_status; + + const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + context->setPath("./"); + const Settings & settings = context->getSettingsRef(); + + const std::string query + = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)"; + + const char * begin = query.data(); + const char * end = query.data() + query.size(); + ParserQuery parser(end, settings.allow_settings_after_format_in_insert); + + ASTPtr ast = parseQuery(parser, begin, end, "", settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks); + + EXPECT_TRUE(ast->as()); + auto & create = ast->as(); + + ColumnsDescription column_descriptions + = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE); + + StorageInMemoryMetadata metadata; + metadata.setColumns(column_descriptions); + metadata.setComment("args.comment"); + ASTPtr partition_by_key; + metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context); + + MergeTreeData::MergingParams merging_params; + merging_params.mode = MergeTreeData::MergingParams::Ordinary; + + + /// This merging param maybe used as part of sorting key + std::optional merging_param_key_arg; + /// Get sorting key from engine arguments. + /// + /// NOTE: store merging_param_key_arg as additional key column. We do it + /// before storage creation. After that storage will just copy this + /// column if sorting key will be changed. + metadata.sorting_key + = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg); + + std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); + + UUID uuid; + UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull; + UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull; + + SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); }); + + auto merge_tree = std::make_shared( + StorageID("", "", uuid), + "WritePipeline_MergeTree", + metadata, + LoadingStrictnessLevel::CREATE, + context, + "", + merging_params, + std::move(storage_settings)); + + Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}}; + DB::Squashing squashing(header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + squashing.add(person_chunk()); + auto x = Squashing::squash(squashing.flush()); + x.getChunkInfos().add(std::make_shared()); + + ASSERT_EQ(7, x.getNumRows()); + ASSERT_EQ(3, x.getNumColumns()); + + + auto metadata_snapshot = std::make_shared(metadata); + ASTPtr none; + auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false)); + + sink->consume(x); + sink->onFinish(); +} + +INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json"); +INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json"); +INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json"); + +TEST(WritePipeline, SparkMergeTree) +{ + const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + context->setPath("./"); + const Settings & settings = context->getSettingsRef(); + + const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_)); + const auto merge_tree_table = MergeTreeRelParser::parseMergeTreeTable(extension_table); + + EXPECT_EQ(merge_tree_table.database, "default"); + EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree"); + EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree"); + EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default"); + + do_remove(merge_tree_table.relative_path); + + const auto dest_storage = MergeTreeRelParser::parseStorage(merge_tree_table, SerializedPlanParser::global_context); + EXPECT_TRUE(dest_storage); + EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote()); + DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); + Block header = metadata_snapshot->getSampleBlock(); + + constexpr std::string_view split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})"; + constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")}; + + auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file); + EXPECT_TRUE(local_executor->hasNext()); + SparkMergeTreeWriter spark_merge_tree_writer(merge_tree_table, context, "this_is_test_"); + + Chain chain; + + // chain.addSink() + + //DB::Squashing squashing(header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + + do + { + spark_merge_tree_writer.write(*local_executor->nextColumnar()); + } while (local_executor->hasNext()); + spark_merge_tree_writer.finalize(); + auto part_infos = spark_merge_tree_writer.getAllPartInfo(); + auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos); + std::cerr << json_info << std::endl; + + /// + { + const auto extension_table_hdfs + = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_)); + const auto merge_tree_table_hdfs = MergeTreeRelParser::parseMergeTreeTable(extension_table_hdfs); + EXPECT_EQ(merge_tree_table_hdfs.database, "default"); + EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs"); + EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs"); + EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main"); + + const auto dest_storage_hdfs = MergeTreeRelParser::parseStorage(merge_tree_table_hdfs, SerializedPlanParser::global_context); + EXPECT_TRUE(dest_storage_hdfs); + EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote()); + } } \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json b/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json index 8ada07819bb6c..ecd86faa549a1 100644 --- a/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json +++ b/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json @@ -268,6 +268,94 @@ "value": { "string": "true" } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type" + }, + "value": { + "string": "hdfs_gluten" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path" + }, + "value": { + "string": "/tmp/hdfs_cache/3.5/" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type" + }, + "value": { + "string": "cache" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size" + }, + "value": { + "string": "10Gi" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk" + }, + "value": { + "string": "hdfs_cache" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit" + }, + "value": { + "string": "false" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_default_replica" + }, + "value": { + "string": "1" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path" + }, + "value": { + "string": "/tmp/metadata/hdfs/3.5/" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint" + }, + "value": { + "string": "hdfs://127.0.0.1:8020/" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes" + }, + "value": { + "string": "main" + } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk" + }, + "value": { + "string": "hdfs" + } } ] } diff --git a/cpp-ch/local-engine/tests/json/mergetree/1_mergetree.json b/cpp-ch/local-engine/tests/json/mergetree/1_mergetree.json new file mode 100644 index 0000000000000..eecb8d7484d26 --- /dev/null +++ b/cpp-ch/local-engine/tests/json/mergetree/1_mergetree.json @@ -0,0 +1,6 @@ +{ + "detail": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "MergeTree;default\nlineitem_mergetree\n1724764750266_0\n{\"names\":[\"l_orderkey\",\"l_partkey\",\"l_suppkey\",\"l_linenumber\",\"l_quantity\",\"l_extendedprice\",\"l_discount\",\"l_tax\",\"l_returnflag\",\"l_linestatus\",\"l_shipdate\",\"l_commitdate\",\"l_receiptdate\",\"l_shipinstruct\",\"l_shipmode\",\"l_comment\"],\"struct\":{\"types\":[{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"date\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"date\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"date\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}}]}}\ntuple()\n\n\n\n\nlineitem_mergetree\n\n{\"storage_policy\":\"default\"}\n" + } +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/mergetree/1_mergetree_hdfs.json b/cpp-ch/local-engine/tests/json/mergetree/1_mergetree_hdfs.json new file mode 100644 index 0000000000000..d1ed674596e54 --- /dev/null +++ b/cpp-ch/local-engine/tests/json/mergetree/1_mergetree_hdfs.json @@ -0,0 +1,6 @@ +{ + "detail": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "MergeTree;default\nlineitem_mergetree_hdfs\n1724766584676_0\n{\"names\":[\"l_orderkey\",\"l_partkey\",\"l_suppkey\",\"l_linenumber\",\"l_quantity\",\"l_extendedprice\",\"l_discount\",\"l_tax\",\"l_returnflag\",\"l_linestatus\",\"l_shipdate\",\"l_commitdate\",\"l_receiptdate\",\"l_shipinstruct\",\"l_shipmode\",\"l_comment\"],\"struct\":{\"types\":[{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"fp64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"date\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"date\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"date\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},{\"string\":{\"nullability\":\"NULLABILITY_NULLABLE\"}}]}}\ntuple()\n\n\n\n\n3.5/test/lineitem_mergetree_hdfs\n\n{\"storage_policy\":\"__hdfs_main\"}\n" + } +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/mergetree/1_plan.json b/cpp-ch/local-engine/tests/json/mergetree/1_plan.json new file mode 100644 index 0000000000000..20d542d81784a --- /dev/null +++ b/cpp-ch/local-engine/tests/json/mergetree/1_plan.json @@ -0,0 +1,246 @@ +{ + "relations": [ + { + "root": { + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + }, + "names": [ + "l_orderkey#84", + "l_partkey#85", + "l_suppkey#86", + "l_linenumber#87", + "l_quantity#88", + "l_extendedprice#89", + "l_discount#90", + "l_tax#91", + "l_returnflag#92", + "l_linestatus#93", + "l_shipdate#94", + "l_commitdate#95", + "l_receiptdate#96", + "l_shipinstruct#97", + "l_shipmode#98", + "l_comment#99" + ], + "outputSchema": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/testConfig.h.in b/cpp-ch/local-engine/tests/testConfig.h.in index 8dd3c2cb68d9c..98740e48e9306 100644 --- a/cpp-ch/local-engine/tests/testConfig.h.in +++ b/cpp-ch/local-engine/tests/testConfig.h.in @@ -5,4 +5,5 @@ #define MERGETREE_DATA(file) "@MERGETREE_DATA_DIR@"#file #define GLUTEN_SOURCE_DIR_ "file://@GLUTEN_REAL_PATH@" -#define GLUTEN_SOURCE_DIR(file) GLUTEN_SOURCE_DIR_ file \ No newline at end of file +#define GLUTEN_SOURCE_DIR(file) GLUTEN_SOURCE_DIR_ file +#define GLUTEN_SOURCE_TPCH_DIR(file) GLUTEN_SOURCE_DIR("/gluten-core/src/test/resources/tpch-data/" file) \ No newline at end of file