Skip to content

Commit

Permalink
remvoe SparkMergeTreeWriter::writeTempPartAndFinalize
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 4, 2024
1 parent 7b6f4dd commit 8893a43
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 16 deletions.
20 changes: 19 additions & 1 deletion cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeDataWriter::writeTempPart(
SinkToStoragePtr SparkStorageMergeTree::write(
const ASTPtr &, const StorageMetadataPtr & storage_in_memory_metadata, ContextPtr context, bool /*async_insert*/)
{
GlutenMergeTreeWriteSettings settings{.partition_settings{MergeTreePartitionWriteSettings::get(context)}};
settings.load(context);
SinkHelperPtr sink_helper = SinkHelper::create(table, settings, getContext());
#ifndef NDEBUG
auto dest_storage = MergeTreeRelParser::getStorage(table, getContext());
assert(dest_storage.get() == this);
#endif

return std::make_shared<SparkMergeTreeSink>(*this, storage_in_memory_metadata, context);
}

Expand Down Expand Up @@ -254,9 +262,9 @@ SinkHelper::SinkHelper(const CustomStorageMergeTreePtr & data_, const GlutenMerg
: write_settings(write_settings_)
, data(data_)
, isRemoteStorage(isRemoteStorage_)
, thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000)
, metadata_snapshot(data->getInMemoryMetadataPtr())
, header(metadata_snapshot->getSampleBlock())
, thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000)
{
}

Expand Down Expand Up @@ -325,6 +333,16 @@ void SinkHelper::doMergePartsAsync(const std::vector<DB::MergeTreeDataPartPtr> &
watch.elapsedMilliseconds());
});
}
void SinkHelper::writeTempPart(
DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot,
const ContextPtr & context,
const MergeTreePartitionWriteSettings & write_settings,
int part_num)
{
auto tmp = dataRef().writer.writeTempPart(block_with_partition, metadata_snapshot, context, write_settings, part_num);
new_parts.emplace_back(tmp.part);
}

void SinkHelper::checkAndMerge(bool force)
{
Expand Down
14 changes: 10 additions & 4 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ class SparkMergeTreeDataWriter
};

class SparkMergeTreeSink;
class SinkHelper;
using SinkHelperPtr = std::shared_ptr<SinkHelper>;

class SparkStorageMergeTree final : public CustomStorageMergeTree
{
friend class SparkMergeTreeSink;
friend class SinkHelper;

public:
SparkStorageMergeTree(const MergeTreeTable & table_, const StorageInMemoryMetadata & metadata, const ContextMutablePtr & context_)
Expand Down Expand Up @@ -161,8 +164,6 @@ class ConcurrentDeque
mutable std::mutex mtx;
};

class SinkHelper;
using SinkHelperPtr = std::shared_ptr<SinkHelper>;
class SinkHelper
{
protected:
Expand All @@ -181,9 +182,15 @@ class SinkHelper
protected:
void doMergePartsAsync(const std::vector<DB::MergeTreeDataPartPtr> & prepare_merge_parts);
virtual void cleanup() { }
SparkStorageMergeTree & dataRef() const { return assert_cast<SparkStorageMergeTree &>(*data); }

public:
void emplacePart(const DB::MergeTreeDataPartPtr & part) { new_parts.emplace_back(part); }
void writeTempPart(
DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot,
const ContextPtr & context,
const MergeTreePartitionWriteSettings & write_settings,
int part_num);

const std::deque<DB::MergeTreeDataPartPtr> & unsafeGet() const { return new_parts.unsafeGet(); }
void checkAndMerge(bool force = false);
Expand All @@ -198,7 +205,6 @@ class SinkHelper
const DB::ContextMutablePtr & context);

virtual CustomStorageMergeTree & dest_storage() { return *data; }
CustomStorageMergeTree & dataRef() const { return *data; }

virtual void commit(const ReadSettings & read_settings, const WriteSettings & write_settings) { }
void saveMetadata(const DB::ContextPtr & context);
Expand Down
10 changes: 1 addition & 9 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ bool SparkMergeTreeWriter::blockToPart(Block & block)
CurrentThread::flushUntrackedMemory();
before_write_memory = memory_tracker->get();
}

dataWrapper->emplacePart(writeTempPartAndFinalize(item, dataWrapper->metadata_snapshot).part);
dataWrapper->writeTempPart(item, dataWrapper->metadata_snapshot, context, write_settings.partition_settings, part_num);
part_num++;
/// Reset earlier to free memory
item.block.clear();
Expand All @@ -120,13 +119,6 @@ void SparkMergeTreeWriter::finalize()
dataWrapper->saveMetadata(context);
}

DB::MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPartAndFinalize(
DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot) const
{
const SparkMergeTreeDataWriter writer(dataWrapper->dataRef());
return writer.writeTempPart(block_with_partition, metadata_snapshot, context, write_settings.partition_settings, part_num);
}

std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo() const
{
std::vector<PartInfo> res;
Expand Down
2 changes: 0 additions & 2 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class SparkMergeTreeWriter
std::vector<PartInfo> getAllPartInfo() const;

private:
DB::MergeTreeDataWriter::TemporaryPart
writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot) const;
bool chunkToPart(Chunk && plan_chunk);
bool blockToPart(Block & block);

Expand Down

0 comments on commit 8893a43

Please sign in to comment.