Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Nov 27, 2023
1 parent ebdf882 commit 7b2042e
Show file tree
Hide file tree
Showing 4 changed files with 543 additions and 14 deletions.
397 changes: 397 additions & 0 deletions cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,397 @@
#include <iterator>
#include <type_traits>
#include <typeinfo>
#include <Operator/ScalableMergingAggregatedStep.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/CurrentThread.h>
#include <Common/formatReadable.h>

namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}

namespace local_engine
{
static DB::ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number)
{
return DB::ITransformingStep::Traits
{
{
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}

ScalableMergingAggregatedStep::ScalableMergingAggregatedStep(
const DB::ContextPtr context_,
const DB::DataStream & input_stream_,
DB::Aggregator::Params params_,
bool should_produce_results_in_order_of_bucket_number_)
: DB::ITransformingStep(
input_stream_, params_.getHeader(input_stream_.header, true), getTraits(should_produce_results_in_order_of_bucket_number_))
, context(context_)
, params(std::move(params_))
{
}

void ScalableMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &)
{
auto transform_params = std::make_shared<DB::AggregatingTransformParams>(pipeline.getHeader(), params, true);
pipeline.resize(1);
auto build_transform = [&](DB::OutputPortRawPtrs outputs)
{
DB::Processors new_processors;
for (auto & output : outputs)
{
auto op = std::make_shared<ScalableMergingAggregatedTransform>(pipeline.getHeader(), transform_params, context);
new_processors.push_back(op);
DB::connect(*output, op->getInputs().front());
}
return new_processors;
};
pipeline.transform(build_transform);
}

void ScalableMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const
{
return params.explain(settings.out, settings.offset);
}

void ScalableMergingAggregatedStep::describeActions(DB::JSONBuilder::JSONMap & map) const
{
params.explain(map);
}

void ScalableMergingAggregatedStep::updateOutputStream()
{
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, true), getDataStreamTraits());
}

ScalableMergingAggregatedTransform::ScalableMergingAggregatedTransform(
const DB::Block & header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_)
: DB::IProcessor({header_}, {params_->getHeader()})
, header(header_)
, params(params_)
, context(context_)
, tmp_data_disk(std::make_unique<DB::TemporaryDataOnDisk>(context_->getTempDataOnDisk()))
{
buckets_data_variants.resize(max_bucket_number + 1, nullptr);
bucket_tmp_files.resize(max_bucket_number + 1, nullptr);
}

bool ScalableMergingAggregatedTransform::isMemoryOverFlow()
{
UInt64 current_memory_usage = getMemoryUsage();
if (params->params.max_bytes_before_external_group_by && current_memory_usage > params->params.max_bytes_before_external_group_by)
{
LOG_INFO(
logger,
"Memory is overflow. current_memory_usage: {}, max_bytes_before_external_group_by: {}",
ReadableSize(current_memory_usage),
ReadableSize(params->params.max_bytes_before_external_group_by));
return true;
}
return false;
}

void ScalableMergingAggregatedTransform::swithMode()
{
if (params->params.keys.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot switch mode for aggregation without keys");
has_two_level = true;
mode = OVERFLOW;
}

size_t ScalableMergingAggregatedTransform::spillBucketDataToDisk(Int32 bucket, DB::Block block)
{
if (!block.rows())
return 0;
Stopwatch watch;
auto * tmp_file = getBucketTempFile(bucket);
block.info.bucket_num = bucket;
auto bytes = tmp_file->write(block);
LOG_TRACE(logger, "spilling one block to disk. bucket: {}, rows: {}, bytes: {}, time: {}", bucket, block.rows(), bytes, watch.elapsedMilliseconds());
return bytes;
}

DB::IProcessor::Status ScalableMergingAggregatedTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.front();
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}

if (has_output)
{
output.push(std::move(output_chunk));
has_output = false;
return Status::PortFull;
}

if (!input_finished)
{
if (!has_input)
{
if (input.isFinished())
{
input_finished = true;
return Status::Ready;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;

input_chunk = input.pull(true);
const auto & info = input_chunk.getChunkInfo();
if (!info)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Chunk info was not set for chunk in ScalableMergingAggregatedTransform.");
if (const auto * agg_info = typeid_cast<const DB::AggregatedChunkInfo *>(info.get()))
{
has_single_level |= agg_info->bucket_num == -1;
has_two_level |= agg_info->bucket_num >= 0;
if (agg_info->is_overflows)
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Not support overflow blocks.");
}
}
else
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow chunk info type.");
}

has_input = true;
}
return Status::Ready;
}
else
{
/// All aggregated data has been output.
if (current_bucket_num >= max_bucket_number)
{
output.finish();
return Status::Finished;
}
}
return Status::Ready;
}

void ScalableMergingAggregatedTransform::work()
{
if (has_input && isMemoryOverFlow())
{
swithMode();
}
workImpl();
}

void ScalableMergingAggregatedTransform::workImpl()
{
if (input_finished)
{
for (; current_bucket_num < max_bucket_number; ++current_bucket_num)
{
auto bucket_data_variants = getBucketDataVariants(current_bucket_num);
if (bucket_data_variants)
{
Stopwatch watch;
auto block = params->aggregator.convertToSingleBlock(*bucket_data_variants, true);
releaseBucketDataVariants(current_bucket_num);
output_chunk = blockToChunk(block);
has_output = true;
LOG_TRACE(logger, "load bucket data from memory. bucket: {}, rows: {}, bytes: {}, time: {}", current_bucket_num, block.rows(), ReadableSize(block.bytes()), watch.elapsedMilliseconds());
return;
}
Stopwatch watch;
auto block = loadBucketDataFromDiskAndMerge(current_bucket_num);
if (block)
{
output_chunk = blockToChunk(block);
has_output = true;
LOG_INFO(
logger,
"load bucket data from disk and merge. bucket: {}, rows: {}, bytes: {}, time: {}",
current_bucket_num,
block.rows(),
ReadableSize(block.bytes()),
watch.elapsedMilliseconds());
return;
}
}
}
else if (has_input)
{
auto add_block = [&](DB::Block & block, Int32 bucket_num, bool create_on_miss)
{
if (!block)
return;
auto bucket_data_variants = getBucketDataVariants(bucket_num, create_on_miss);
if (bucket_data_variants)
{
params->aggregator.mergeOnBlock(block, *bucket_data_variants, no_more_keys);
}
else
{
spillBucketDataToDisk(bucket_num, block);
}
};

if (has_single_level && (has_two_level || mode == OVERFLOW))
{
auto bucket_data_variants = getBucketDataVariants(-1, false);
if (bucket_data_variants)
{
auto blocks_list = params->aggregator.convertToBlocks(*bucket_data_variants, false, 1);
if (blocks_list.size() > 1)
{
/// two level hash table.
for (auto & blk : blocks_list)
add_block(blk, blk.info.bucket_num, true);
}
else if (blocks_list.size() == 1)
{
/// Single level hash table
auto blocks_vector = params->aggregator.convertBlockToTwoLevel(blocks_list.front());
for (auto & blk : blocks_vector)
add_block(blk, blk.info.bucket_num, true);
}
releaseBucketDataVariants(-1);
}
has_single_level = false;
}
/// In case the OOM is caused by bucket -1, we first split the bucket -1 into 256 buckets. and then spill one bucket here.
if (isMemoryOverFlow() && mode == OVERFLOW)
{
spillOneBucket();
}

/// If we have at least one two level block, transform all single level blocks into two level blocks.
const auto * agg_info = typeid_cast<const DB::AggregatedChunkInfo *>(input_chunk.getChunkInfo().get());
if (agg_info->bucket_num == -1 && (has_two_level || mode == OVERFLOW))
{
auto block = chunkToBlock(input_chunk);
auto block_struct = block.dumpStructure();
auto blocks_vector = params->aggregator.convertBlockToTwoLevel(block);
for (auto & blk : blocks_vector)
{
add_block(blk, blk.info.bucket_num, mode != OVERFLOW);
}
has_single_level = false;
}
else
{
auto block = chunkToBlock(input_chunk);
add_block(block, agg_info->bucket_num, mode != OVERFLOW);
}
input_chunk = {};
has_input = false;
}
}
DB::AggregatedDataVariantsPtr ScalableMergingAggregatedTransform::getBucketDataVariants(Int32 bucket, bool create_on_miss, size_t size_hint [[maybe_unused]])
{
UInt32 index = static_cast<UInt32>(1 + bucket);
if (!buckets_data_variants[index] && create_on_miss)
{
in_memory_buckets_num += 1;
buckets_data_variants[index] = std::make_shared<DB::AggregatedDataVariants>();
}
return buckets_data_variants[index];
}

void ScalableMergingAggregatedTransform::releaseBucketDataVariants(Int32 bucket)
{
UInt32 index = static_cast<UInt32>(1 + bucket);
buckets_data_variants[index] = nullptr;
in_memory_buckets_num -= 1;
}

DB::Block ScalableMergingAggregatedTransform::loadBucketDataFromDiskAndMerge(Int32 bucket)
{
UInt32 index = static_cast<UInt32>(1 + bucket);
if (!bucket_tmp_files[index])
return {};
bucket_tmp_files[index]->finishWriting();
auto data_variant = std::make_shared<DB::AggregatedDataVariants>();
bool has_data = false;
while(true)
{
auto block = bucket_tmp_files[index]->read();
if (!block)
break;
has_data = true;
params->aggregator.mergeOnBlock(block, *data_variant, no_more_keys);
}
if (!has_data)
return {};
return params->aggregator.convertToSingleBlock(*data_variant, true);
}

DB::Chunk ScalableMergingAggregatedTransform::blockToChunk(DB::Block & block)
{
auto info = std::make_shared<DB::AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;

UInt64 num_rows = block.rows();
DB::Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}

DB::Block ScalableMergingAggregatedTransform::chunkToBlock(DB::Chunk & chunk)
{
auto block = header.cloneWithColumns(chunk.detachColumns());
const auto & info = chunk.getChunkInfo();
if (const auto * agg_info = typeid_cast<const DB::AggregatedChunkInfo *>(info.get()))
{
block.info.bucket_num = agg_info->bucket_num;
block.info.is_overflows = agg_info->is_overflows;
}
return block;
}

Int64 ScalableMergingAggregatedTransform::getMemoryUsage()
{
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
current_memory_usage = memory_tracker->get();
return current_memory_usage;
}

void ScalableMergingAggregatedTransform::spillOneBucket()
{

if (in_memory_buckets_num <= 1)
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot spill one bucket.");
}
for (size_t i = max_bucket_number; i > 0; --i)
{
if (buckets_data_variants[i])
{
auto block = params->aggregator.convertToSingleBlock(*buckets_data_variants[i], false);
auto write_bytes = spillBucketDataToDisk(i - 1, block);
releaseBucketDataVariants(i - 1);
break;
}
}
}
}
Loading

0 comments on commit 7b2042e

Please sign in to comment.