Skip to content

Commit

Permalink
Remove write_setting of SparkMergeTreeWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 4, 2024
1 parent dd5e721 commit 717a5a6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
13 changes: 11 additions & 2 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ SinkHelperPtr SinkHelper::create(
}

SinkHelper::SinkHelper(const CustomStorageMergeTreePtr & data_, const GlutenMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_)
: write_settings(write_settings_)
, data(data_)
: data(data_)
, isRemoteStorage(isRemoteStorage_)
, thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000)
, write_settings(write_settings_)
, metadata_snapshot(data->getInMemoryMetadataPtr())
, header(metadata_snapshot->getSampleBlock())
{
Expand Down Expand Up @@ -342,6 +342,8 @@ void SinkHelper::writeTempPart(DB::BlockWithPartition & block_with_partition, co

void SinkHelper::checkAndMerge(bool force)
{
if (!write_settings.merge_after_insert)
return;
// Only finalize should force merge.
if (!force && new_parts.size() < write_settings.merge_limit_parts)
return;
Expand Down Expand Up @@ -380,6 +382,13 @@ void SinkHelper::checkAndMerge(bool force)

new_parts.emplace_back(skip_parts);
}
void SinkHelper::finish(const DB::ContextPtr & context)
{
if (write_settings.merge_after_insert)
finalizeMerge();
commit(context->getReadSettings(), context->getWriteSettings());
saveMetadata(context);
}

void SinkHelper::finalizeMerge()
{
Expand Down
8 changes: 4 additions & 4 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ class ConcurrentDeque
class SinkHelper
{
protected:
const GlutenMergeTreeWriteSettings write_settings;
CustomStorageMergeTreePtr data;
bool isRemoteStorage;

Expand All @@ -176,11 +175,13 @@ class SinkHelper
ThreadPool thread_pool;

public:
const GlutenMergeTreeWriteSettings write_settings;
const DB::StorageMetadataPtr metadata_snapshot;
const DB::Block header;

protected:
void doMergePartsAsync(const std::vector<DB::MergeTreeDataPartPtr> & prepare_merge_parts);
void finalizeMerge();
virtual void cleanup() { }
SparkStorageMergeTree & dataRef() const { return assert_cast<SparkStorageMergeTree &>(*data); }

Expand All @@ -189,11 +190,10 @@ class SinkHelper

const std::deque<DB::MergeTreeDataPartPtr> & unsafeGet() const { return new_parts.unsafeGet(); }
void checkAndMerge(bool force = false);
void finalizeMerge();
void finish(const DB::ContextPtr & context);

virtual ~SinkHelper() = default;
explicit SinkHelper(
const CustomStorageMergeTreePtr & data_, const GlutenMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_);
SinkHelper(const CustomStorageMergeTreePtr & data_, const GlutenMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_);
static SinkHelperPtr create(
const MergeTreeTable & merge_tree_table,
const GlutenMergeTreeWriteSettings & write_settings_,
Expand Down
20 changes: 6 additions & 14 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ namespace local_engine
{
SparkMergeTreeWriter::SparkMergeTreeWriter(
const MergeTreeTable & merge_tree_table, const GlutenMergeTreeWriteSettings & write_settings_, const DB::ContextPtr & context_)
: write_settings(write_settings_)
, dataWrapper(SinkHelper::create(merge_tree_table, write_settings, SerializedPlanParser::global_context))
, context(context_)
: dataWrapper(SinkHelper::create(merge_tree_table, write_settings_, SerializedPlanParser::global_context)), context(context_)
{
const DB::Settings & settings = context->getSettingsRef();
squashing
= std::make_unique<DB::Squashing>(dataWrapper->header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
if (!write_settings.partition_settings.partition_dir.empty())
extractPartitionValues(write_settings.partition_settings.partition_dir, partition_values);
if (!write_settings_.partition_settings.partition_dir.empty())
extractPartitionValues(write_settings_.partition_settings.partition_dir, partition_values);
}

void SparkMergeTreeWriter::write(const DB::Block & block)
Expand All @@ -68,9 +66,7 @@ void SparkMergeTreeWriter::write(const DB::Block & block)
const ExpressionActions expression_actions{std::move(converter)};
expression_actions.execute(new_block);

bool has_part = chunkToPart(squashing->add({new_block.getColumns(), new_block.rows()}));

if (has_part && write_settings.merge_after_insert)
if (chunkToPart(squashing->add({new_block.getColumns(), new_block.rows()})))
dataWrapper->checkAndMerge();
}

Expand Down Expand Up @@ -112,11 +108,7 @@ bool SparkMergeTreeWriter::blockToPart(Block & block)
void SparkMergeTreeWriter::finalize()
{
chunkToPart(squashing->flush());
if (write_settings.merge_after_insert)
dataWrapper->finalizeMerge();

dataWrapper->commit(context->getReadSettings(), context->getWriteSettings());
dataWrapper->saveMetadata(context);
dataWrapper->finish(context);
}

std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo() const
Expand All @@ -133,7 +125,7 @@ std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo() const
part->getBytesOnDisk(),
part->rows_count,
partition_values,
write_settings.partition_settings.bucket_dir});
dataWrapper->write_settings.partition_settings.bucket_dir});
}
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class SparkMergeTreeWriter
bool chunkToPart(Chunk && plan_chunk);
bool blockToPart(Block & block);

const GlutenMergeTreeWriteSettings write_settings;
SinkHelperPtr dataWrapper;
DB::ContextPtr context;
std::unordered_map<String, String> partition_values;
Expand Down

0 comments on commit 717a5a6

Please sign in to comment.