Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-8] Support one pipeline write for partition me…
Browse files Browse the repository at this point in the history
…rgetree (apache#7924)

* [Refactor] simple refactor
* [Refactor] Remove setStats
* [Refactor] SparkPartitionedBaseSink and WriteStatsBase
* [Refactor] Add explicit SparkMergeTreeWriteSettings(const DB::ContextPtr & context);
* [New] Support writing partition mergetree in one pipeline
  • Loading branch information
baibaichen authored and PHILO-HE committed Nov 13, 2024
1 parent 0a89a6a commit 05fff70
Show file tree
Hide file tree
Showing 15 changed files with 701 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,103 +162,107 @@ class GlutenClickHouseMergeTreeWriteSuite
}

test("test mergetree insert overwrite") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite;
|""".stripMargin)
withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)) {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|LOCATION '$basePath/lineitem_mergetree_insertoverwrite'
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|LOCATION '$basePath/lineitem_mergetree_insertoverwrite'
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_insertoverwrite
| select * from lineitem
|""".stripMargin)
spark.sql(s"""
| insert into table lineitem_mergetree_insertoverwrite
| select * from lineitem
|""".stripMargin)

spark.sql(s"""
| insert overwrite table lineitem_mergetree_insertoverwrite
| select * from lineitem where mod(l_orderkey,2) = 1
|""".stripMargin)
val sql2 =
s"""
| select count(*) from lineitem_mergetree_insertoverwrite
|
|""".stripMargin
assertResult(300001)(
// total rows should remain unchanged
spark.sql(sql2).collect().apply(0).get(0)
)
spark.sql(s"""
| insert overwrite table lineitem_mergetree_insertoverwrite
| select * from lineitem where mod(l_orderkey,2) = 1
|""".stripMargin)
val sql2 =
s"""
| select count(*) from lineitem_mergetree_insertoverwrite
|
|""".stripMargin
assertResult(300001)(
// total rows should remain unchanged
spark.sql(sql2).collect().apply(0).get(0)
)
}
}

test("test mergetree insert overwrite partitioned table with small table, static") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2;
|""".stripMargin)
withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, "false")) {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite2
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|PARTITIONED BY (l_shipdate)
|LOCATION '$basePath/lineitem_mergetree_insertoverwrite2'
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite2
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|PARTITIONED BY (l_shipdate)
|LOCATION '$basePath/lineitem_mergetree_insertoverwrite2'
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_insertoverwrite2
| select * from lineitem
|""".stripMargin)
spark.sql(s"""
| insert into table lineitem_mergetree_insertoverwrite2
| select * from lineitem
|""".stripMargin)

spark.sql(
s"""
| insert overwrite table lineitem_mergetree_insertoverwrite2
| select * from lineitem where l_shipdate BETWEEN date'1993-02-01' AND date'1993-02-10'
|""".stripMargin)
val sql2 =
s"""
| select count(*) from lineitem_mergetree_insertoverwrite2
|
|""".stripMargin
assertResult(2418)(
// total rows should remain unchanged
spark.sql(sql2).collect().apply(0).get(0)
)
spark.sql(
s"""
| insert overwrite table lineitem_mergetree_insertoverwrite2
| select * from lineitem where l_shipdate BETWEEN date'1993-02-01' AND date'1993-02-10'
|""".stripMargin)
val sql2 =
s"""
| select count(*) from lineitem_mergetree_insertoverwrite2
|
|""".stripMargin
assertResult(2418)(
// total rows should remain unchanged
spark.sql(sql2).collect().apply(0).get(0)
)
}
}

test("test mergetree insert overwrite partitioned table with small table, dynamic") {
Expand Down
39 changes: 20 additions & 19 deletions cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,11 @@ DB::ProcessorPtr make_sink(
{
if (partition_by.empty())
{
auto file_sink = std::make_shared<SubstraitFileSink>(context, base_path, "", filename, format_hint, input_header);
file_sink->setStats(stats);
return file_sink;
return std::make_shared<SubstraitFileSink>(context, base_path, "", filename, format_hint, input_header, stats);
}

auto file_sink = std::make_shared<SubstraitPartitionedFileSink>(
context, partition_by, input_header, output_header, base_path, filename, format_hint);
file_sink->setStats(stats);
return file_sink;
return std::make_shared<SubstraitPartitionedFileSink>(
context, partition_by, input_header, output_header, base_path, filename, format_hint, stats);
}

DB::ExpressionActionsPtr create_rename_action(const DB::Block & input, const DB::Block & output)
Expand Down Expand Up @@ -148,25 +144,30 @@ void addMergeTreeSinkTransform(
const DB::ContextPtr & context,
const DB::QueryPipelineBuilderPtr & builder,
const MergeTreeTable & merge_tree_table,
const DB::Block & output,
const DB::Names & /*partitionCols*/)
const DB::Block & header,
const DB::Names & partition_by)
{
const DB::Settings & settings = context->getSettingsRef();
const auto dest_storage = merge_tree_table.getStorage(context->getGlobalContext());
StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
ASTPtr none;
auto sink = dest_storage->write(none, metadata_snapshot, context, false);

Chain chain;
chain.addSink(sink);
const SinkHelper & sink_helper = assert_cast<const SparkMergeTreeSink &>(*sink).sinkHelper();
//
auto stats = std::make_shared<MergeTreeStats>(output, sink_helper);
auto stats = std::make_shared<MergeTreeStats>(header);
chain.addSink(stats);
//

SparkMergeTreeWriteSettings write_settings{context};
if (partition_by.empty())
write_settings.partition_settings.partition_dir = SubstraitFileSink::NO_PARTITION_ID;

auto sink = partition_by.empty() ?
SparkMergeTreeSink::create(merge_tree_table, write_settings, context->getGlobalContext(), {stats}) :
std::make_shared<SparkMergeTreePartitionedFileSink>(header, partition_by, merge_tree_table, write_settings, context, stats);

chain.addSource(sink);
const DB::Settings & settings = context->getSettingsRef();
chain.addSource(std::make_shared<ApplySquashingTransform>(
output, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]));
header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]));
chain.addSource(std::make_shared<PlanSquashingTransform>(
output, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]));
header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]));

builder->addChain(std::move(chain));
}
Expand Down
8 changes: 2 additions & 6 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list
return query_plan;
}

DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan)
DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan) const
{
const Settings & settings = parser_context->queryContext()->getSettingsRef();
QueryPriorities priorities;
Expand Down Expand Up @@ -355,11 +355,7 @@ NonNullableColumnsResolver::NonNullableColumnsResolver(
expression_parser = std::make_unique<ExpressionParser>(parser_context);
}

NonNullableColumnsResolver::~NonNullableColumnsResolver()
{
}

// make it simple at present, if the condition contains or, return empty for both side.
// make it simple at present if the condition contains or, return empty for both side.
std::set<String> NonNullableColumnsResolver::resolve()
{
collected_columns.clear();
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class NonNullableColumnsResolver
public:
explicit NonNullableColumnsResolver(
const DB::Block & header_, std::shared_ptr<const ParserContext> parser_context_, const substrait::Expression & cond_rel_);
~NonNullableColumnsResolver();
~NonNullableColumnsResolver() = default;

// return column names
std::set<String> resolve();
Expand Down Expand Up @@ -76,7 +76,7 @@ class SerializedPlanParser
/// visible for UT
DB::QueryPlanPtr parse(const substrait::Plan & plan);
std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & plan);
DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan);
DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan) const;
///
std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);

Expand Down
19 changes: 14 additions & 5 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,35 @@ void SparkMergeTreeSink::onStart()
void SparkMergeTreeSink::onFinish()
{
sink_helper->finish(context);
if (stats_.has_value())
(*stats_)->collectStats(sink_helper->unsafeGet(), sink_helper->write_settings.partition_settings.partition_dir);
}

/////
SinkHelperPtr SparkMergeTreeSink::create(
const MergeTreeTable & merge_tree_table, const SparkMergeTreeWriteSettings & write_settings_, const DB::ContextMutablePtr & context)
SinkToStoragePtr SparkMergeTreeSink::create(
const MergeTreeTable & merge_tree_table,
const SparkMergeTreeWriteSettings & write_settings_,
const DB::ContextMutablePtr & context,
const SinkStatsOption & stats)
{
if (write_settings_.partition_settings.part_name_prefix.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "empty part_name_prefix is not allowed.");

auto dest_storage = merge_tree_table.getStorage(context);
bool isRemoteStorage = dest_storage->getStoragePolicy()->getAnyDisk()->isRemote();
bool insert_with_local_storage = !write_settings_.insert_without_local_storage;
SinkHelperPtr sink_helper;
if (insert_with_local_storage && isRemoteStorage)
{
auto temp = merge_tree_table.copyToDefaultPolicyStorage(context);
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Create temp table {} for local merge.",
temp->getStorageID().getFullNameNotQuoted());
return std::make_shared<CopyToRemoteSinkHelper>(temp, dest_storage, write_settings_);
sink_helper = std::make_shared<CopyToRemoteSinkHelper>(temp, dest_storage, write_settings_);
}

return std::make_shared<DirectSinkHelper>(dest_storage, write_settings_, isRemoteStorage);
sink_helper = std::make_shared<DirectSinkHelper>(dest_storage, write_settings_, isRemoteStorage);
return std::make_shared<SparkMergeTreeSink>(sink_helper, context, stats);
}

SinkHelper::SinkHelper(const SparkStorageMergeTreePtr & data_, const SparkMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_)
Expand Down
Loading

0 comments on commit 05fff70

Please sign in to comment.