Skip to content

Commit

Permalink
EMBEDDED_PLAN and create_plan_and_executor
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Aug 28, 2024
1 parent d579ff4 commit 929465a
Show file tree
Hide file tree
Showing 17 changed files with 619 additions and 99 deletions.
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/tests/benchmark_local_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Shuffle/ShuffleReader.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/MergeTree/CustomStorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeTool.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
Expand All @@ -45,7 +46,6 @@
#include <Poco/Util/MapConfiguration.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/MergeTreeTool.h>
#include <Common/PODArray_fwd.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
Expand Down
13 changes: 12 additions & 1 deletion cpp-ch/local-engine/tests/gluten_test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFile.h>

#include <Interpreters/ActionsVisitor.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <substrait/plan.pb.h>
#include <Common/BlockTypeUtils.h>
#include <Common/Exception.h>

Expand Down Expand Up @@ -73,6 +74,16 @@ std::optional<ActionsDAG> parseFilter(const std::string & filter, const AnotherR
return ActionsDAG::buildFilterActionsDAG({visitor_data.getActions().getOutputs().back()}, node_name_to_input_column);
}

std::pair<substrait::Plan, std::unique_ptr<LocalExecutor>> create_plan_and_executor(
std::string_view json_plan, std::string_view split_template, std::string_view file, const std::optional<DB::ContextPtr> & context)
{
const std::string split = replaceLocalFilesWildcards(split_template, file);
SerializedPlanParser parser(context.value_or(SerializedPlanParser::global_context));
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));
const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(json_plan);
return {plan, parser.createExecutor(plan)};
}

const char * get_data_dir()
{
const auto * const result = std::getenv("PARQUET_TEST_DATA");
Expand Down
29 changes: 22 additions & 7 deletions cpp-ch/local-engine/tests/gluten_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
#include <Core/Block.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>

#include <Interpreters/ActionsDAG.h>
#include <Parser/SerializedPlanParser.h>
#include <boost/algorithm/string/replace.hpp>
#include <parquet/schema.h>

namespace local_engine
{
class LocalExecutor;
}
using BlockRowType = DB::ColumnsWithTypeAndName;
using BlockFieldType = DB::ColumnWithTypeAndName;
using AnotherRowType = DB::NamesAndTypesList;
Expand Down Expand Up @@ -65,12 +68,18 @@ AnotherRowType readParquetSchema(const std::string & file);

std::optional<DB::ActionsDAG> parseFilter(const std::string & filter, const AnotherRowType & name_and_types);

std::pair<substrait::Plan, std::unique_ptr<LocalExecutor>> create_plan_and_executor(
std::string_view json_plan,
std::string_view split_template,
std::string_view file,
const std::optional<DB::ContextPtr> & context = std::nullopt);

}

inline std::string replaceLocalFilesWildcards(const String & haystack, const String & replaced)
inline std::string replaceLocalFilesWildcards(const std::string_view haystack, const std::string_view replaced)
{
static constexpr auto _WILDCARD_ = "{replace_local_files}";
return boost::replace_all_copy(haystack, _WILDCARD_, replaced);
static constexpr auto wildcard = "{replace_local_files}";
return boost::replace_all_copy(std::string{haystack}, wildcard, replaced);
}

inline BlockFieldType toBlockFieldType(const AnotherFieldType & type)
Expand Down Expand Up @@ -122,4 +131,10 @@ inline parquet::ByteArray ByteArrayFromString(const std::string & s)
{
const auto * const ptr = reinterpret_cast<const uint8_t *>(s.data());
return parquet::ByteArray(static_cast<uint32_t>(s.size()), ptr);
}
}

#define EMBEDDED_PLAN(res) \
std::string_view \
{ \
reinterpret_cast<const char *>(g##res##Data), g##res##Size \
}
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/tests/gtest_ch_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

#include <Storages/MergeTree/MergeTreeTool.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <gtest/gtest.h>
#include <Common/DebugUtils.h>
#include <Common/MergeTreeTool.h>

#include <Core/Settings.h>
#include <Interpreters/HashJoin/HashJoin.h>
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/tests/gtest_ch_storages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#include <Parsers/ASTFunction.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeTool.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <google/protobuf/util/json_util.h>
#include <google/protobuf/wrappers.pb.h>
#include <gtest/gtest.h>
#include <substrait/plan.pb.h>
#include <Common/DebugUtils.h>
#include <Common/MergeTreeTool.h>

using namespace DB;
using namespace local_engine;
Expand Down
54 changes: 14 additions & 40 deletions cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,18 @@ using namespace local_engine;
using namespace DB;

// Plan for https://github.com/ClickHouse/ClickHouse/pull/54881
INCBIN(resource_embedded_pr_54881_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_54881.json");

INCBIN(_pr_54881_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_54881.json");
TEST(Clickhouse, PR54881)
{
const auto context1 = DB::Context::createCopy(SerializedPlanParser::global_context);
// context1->setSetting("enable_named_columns_in_function_tuple", DB::Field(true));
auto settings = context1->getSettingsRef();
EXPECT_FALSE(settings.enable_named_columns_in_function_tuple) << "GLUTEN NEED set enable_named_columns_in_function_tuple to false";

const std::string split_template
constexpr std::string_view split_template
= R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"1529","parquet":{},"schema":{},"metadataColumns":[{}]}]})";
const std::string split
= replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/54881.snappy.parquet"));

SerializedPlanParser parser(context1);
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));

const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_pr_54881_jsonData), gresource_embedded_pr_54881_jsonSize});

auto local_executor = parser.createExecutor(plan);
constexpr std::string_view file{GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/54881.snappy.parquet")};
auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_pr_54881_), split_template, file, context1);

EXPECT_TRUE(local_executor->hasNext());
const Block & block = *local_executor->nextColumnar();
Expand Down Expand Up @@ -86,53 +77,36 @@ TEST(Clickhouse, PR54881)
}

// Plan for https://github.com/ClickHouse/ClickHouse/pull/65234
INCBIN(resource_embedded_pr_65234_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_65234.json");

INCBIN(_pr_65234_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_65234.json");
TEST(Clickhouse, PR65234)
{
const std::string split = R"({"items":[{"uriFile":"file:///foo","length":"84633","parquet":{},"schema":{},"metadataColumns":[{}]}]})";
SerializedPlanParser parser(SerializedPlanParser::global_context);
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));
const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_pr_65234_jsonData), gresource_embedded_pr_65234_jsonSize});
const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(EMBEDDED_PLAN(_pr_65234_));
auto query_plan = parser.parse(plan);
}

INCBIN(resource_embedded_pr_68135_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68135.json");
INCBIN(_pr_68135_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68135.json");
TEST(Clickhouse, PR68135)
{
const std::string split_template
constexpr std::string_view split_template
= R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"461","parquet":{},"schema":{},"metadataColumns":[{}]}]})";
const std::string split
= replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68135.snappy.parquet"));
constexpr std::string_view file{GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68135.snappy.parquet")};
auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_pr_68135_), split_template, file);

SerializedPlanParser parser(SerializedPlanParser::global_context);
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));

const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_pr_68135_jsonData), gresource_embedded_pr_68135_jsonSize});

auto local_executor = parser.createExecutor(plan);
EXPECT_TRUE(local_executor->hasNext());
const Block & x = *local_executor->nextColumnar();
debug::headBlock(x);
}

INCBIN(resource_embedded_pr_68131_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68131.json");
INCBIN(_pr_68131_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68131.json");
TEST(Clickhouse, PR68131)
{
const std::string split_template
constexpr std::string_view split_template
= R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"289","parquet":{},"schema":{},"metadataColumns":[{}]}]})";
const std::string split
= replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68131.parquet"));

SerializedPlanParser parser(SerializedPlanParser::global_context);
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));

const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_pr_68131_jsonData), gresource_embedded_pr_68131_jsonSize});

auto local_executor = parser.createExecutor(plan);
auto [_, local_executor] = test::create_plan_and_executor(
EMBEDDED_PLAN(_pr_68131_), split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68131.parquet"));
EXPECT_TRUE(local_executor->hasNext());
const Block & x = *local_executor->nextColumnar();
debug::headBlock(x);
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/tests/gtest_local_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <Parser/SubstraitParserUtils.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/MergeTree/CustomStorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <gtest/gtest.h>
#include <substrait/plan.pb.h>
Expand Down
20 changes: 6 additions & 14 deletions cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,19 @@ using namespace local_engine;

using namespace DB;

INCBIN(resource_embedded_pr_18_2_json, SOURCE_DIR "/utils/extern-local-engine/tests/decmial_filter_push_down/18_2.json");
TEST(ColumnIndex, Deciaml182)
INCBIN(_pr_18_2, SOURCE_DIR "/utils/extern-local-engine/tests/decimal_filter_push_down/18_2.json");
TEST(ColumnIndex, Decimal182)
{
// [precision,scale] = [18,2]
const auto context1 = DB::Context::createCopy(SerializedPlanParser::global_context);

auto config = ExecutorConfig::loadFromContext(context1);
const auto config = ExecutorConfig::loadFromContext(context1);
EXPECT_TRUE(config.use_local_format) << "gtest need set use_local_format to true";

const std::string split_template
constexpr std::string_view split_template
= R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"488","parquet":{},"schema":{},"metadataColumns":[{}]}]})";
const std::string split = replaceLocalFilesWildcards(
split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet"));

SerializedPlanParser parser(context1);
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));

const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_pr_18_2_jsonData), gresource_embedded_pr_18_2_jsonSize});
constexpr std::string_view file{GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/decimal_filter_push_down/18_2_flba.snappy.parquet")};
auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_pr_18_2), split_template, file, context1);

auto local_executor = parser.createExecutor(plan);
EXPECT_TRUE(local_executor->hasNext());
const Block & x = *local_executor->nextColumnar();
debug::headBlock(x);
Expand Down
10 changes: 4 additions & 6 deletions cpp-ch/local-engine/tests/gtest_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ using namespace local_engine;
using namespace DB;


INCBIN(resource_embedded_readcsv_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/read_student_option_schema.csv.json");
INCBIN(_readcsv_plan, SOURCE_DIR "/utils/extern-local-engine/tests/json/read_student_option_schema.csv.json");
TEST(LocalExecutor, ReadCSV)
{
const std::string split_template
constexpr std::string_view split_template
= R"({"items":[{"uriFile":"{replace_local_files}","length":"56","text":{"fieldDelimiter":",","maxBlockSize":"8192","header":"1"},"schema":{"names":["id","name","language"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})";
const std::string split = replaceLocalFilesWildcards(
split_template, GLUTEN_SOURCE_DIR("/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv"));
SerializedPlanParser parser(SerializedPlanParser::global_context);
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));
auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_readcsv_jsonData), gresource_embedded_readcsv_jsonSize});
auto plan = local_engine::JsonStringToMessage<substrait::Plan>(EMBEDDED_PLAN(_readcsv_plan));

auto query_plan = parser.parse(plan);
const auto pipeline = parser.buildQueryPipeline(*query_plan);
LocalExecutor local_executor{std::move(query_plan), QueryPipelineBuilder::getPipeline(std::move(*pipeline))};

EXPECT_TRUE(local_executor.hasNext());
const Block & x = *local_executor.nextColumnar();
EXPECT_EQ(4, x.rows());
Expand All @@ -56,12 +56,10 @@ size_t count(const substrait::Type_Struct & type)
{
size_t ret = 0;
for (const auto & t : type.types())
{
if (t.has_struct_())
ret += 1 + count(t.struct_());
else
ret++;
}
return ret;
}

Expand Down
Loading

0 comments on commit 929465a

Please sign in to comment.