From ddaa62a79a88653994f2dd63fc9a4b88b5d07ee1 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 6 Dec 2023 11:10:01 +0800 Subject: [PATCH] wip. 1206 --- .../Operator/GraceMergingAggregatedStep.cpp | 412 ++++++++++++++++++ .../Operator/GraceMergingAggregatedStep.h | 112 +++++ .../Operator/StreamingAggregatingStep.cpp | 290 ++++++++++++ .../Operator/StreamingAggregatingStep.h | 94 ++++ 4 files changed, 908 insertions(+) create mode 100644 cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp create mode 100644 cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h create mode 100644 cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp create mode 100644 cpp-ch/local-engine/Operator/StreamingAggregatingStep.h diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp new file mode 100644 index 0000000000000..60c06e13da81b --- /dev/null +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "GraceMergingAggregatedStep.h" +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +static DB::ITransformingStep::Traits getTraits() +{ + return DB::ITransformingStep::Traits + { + { + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +GraceMergingAggregatedStep::GraceMergingAggregatedStep( + DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_) + : DB::ITransformingStep( + input_stream_, params_.getHeader(input_stream_.header, true), getTraits()) + , context(context_) + , params(std::move(params_)) +{ +} + +void GraceMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) +{ + auto transform_params = std::make_shared(pipeline.getHeader(), params, true); + pipeline.resize(1); + auto build_transform = [&](DB::OutputPortRawPtrs outputs) + { + DB::Processors new_processors; + for (auto & output : outputs) + { + auto op = std::make_shared(pipeline.getHeader(), transform_params, context); + new_processors.push_back(op); + DB::connect(*output, op->getInputs().front()); + } + return new_processors; + }; + pipeline.transform(build_transform); +} + +void GraceMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const +{ + return params.explain(settings.out, settings.offset); +} + +void GraceMergingAggregatedStep::describeActions(DB::JSONBuilder::JSONMap & map) const +{ + params.explain(map); +} + +void GraceMergingAggregatedStep::updateOutputStream() +{ + output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, true), getDataStreamTraits()); +} + +GraceMergingAggregatedTransform::GraceMergingAggregatedTransform(const DB::Block &header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_) + : IProcessor({header_}, {params_->getHeader()}) + , header(header_) + , params(params_) + , context(context_) + , tmp_data_disk(std::make_unique(context_->getTempDataOnDisk())) +{ + current_data_variants = std::make_shared(); + // bucket 0 is for in-memory data, it's just a placeholder. + buckets.emplace(0, BufferFileStream()); +} + +GraceMergingAggregatedTransform::~GraceMergingAggregatedTransform() +{ + LOG_INFO( + logger, + "Metrics. total_input_blocks: {}, total_input_rows: {}, total_output_blocks: {}, total_output_rows: {}, total_spill_disk_bytes: " + "{}, total_spill_disk_time: {}, total_read_disk_time: {}, total_scatter_time: {}", + total_input_blocks, + total_input_rows, + total_output_blocks, + total_output_rows, + total_spill_disk_bytes, + total_spill_disk_time, + total_read_disk_time, + total_scatter_time); +} + +GraceMergingAggregatedTransform::Status GraceMergingAggregatedTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.front(); + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + if (has_output) + { + if (output.canPush()) + { + total_output_rows += output_chunk.getNumRows(); + total_output_blocks++; + output.push(std::move(output_chunk)); + has_output = false; + } + return Status::PortFull; + } + + if (has_input) + return Status::Ready; + + if (!input_finished) + { + if (input.isFinished()) + { + input_finished = true; + return Status::Ready; + } + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + input_chunk = input.pull(true); + total_input_rows += input_chunk.getNumRows(); + total_input_blocks++; + has_input = true; + return Status::Ready; + } + + if (current_bucket_index >= getBucketsNum() && current_final_blocks.empty()) + { + output.finish(); + return Status::Finished; + } + return Status::Ready; +} + +void GraceMergingAggregatedTransform::work() +{ + if (has_input) + { + assert(!input_finished); + auto block = header.cloneWithColumns(input_chunk.detachColumns()); + mergeOneBlock(block); + has_input = false; + } + else + { + assert(input_finished); + auto pop_one_chunk = [&]() + { + while (!current_final_blocks.empty()) + { + if (!current_final_blocks.front().rows()) + { + current_final_blocks.pop_front(); + continue; + } + + auto & block = current_final_blocks.front(); + output_chunk = DB::Chunk(block.getColumns(), block.rows()); + current_final_blocks.pop_front(); + has_output = true; + return; + } + }; + + if (current_final_blocks.empty()) + { + if (current_bucket_index >= getBucketsNum()) + return; + prepareBucketOutputBlocks(); + current_bucket_index++; + current_data_variants = nullptr; + } + pop_one_chunk(); + } +} + +void GraceMergingAggregatedTransform::extendBuckets() +{ + auto current_size = getBucketsNum(); + auto next_size = current_size * 2; + if (next_size > max_buckets) + throw DB::Exception( + DB::ErrorCodes::LOGICAL_ERROR, + "Too many buckets, limit is {}. Please consider increate offhead size or partitoin number", + max_buckets); + LOG_INFO(logger, "extend buckets from {} to {}", current_size, next_size); + for (size_t i = current_size; i < next_size; ++i) + buckets.emplace(i, BufferFileStream()); +} + +void GraceMergingAggregatedTransform::rehashDataVariants() +{ + auto blocks = params->aggregator.convertToBlocks(*current_data_variants, false, 1); + current_data_variants = std::make_shared(); + no_more_keys = false; + for (auto & block : blocks) + { + auto scattered_blocks = scatterBlock(block); + block = {}; + for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) + { + addBlockIntoFileBucket(i, scattered_blocks[i]); + scattered_blocks[i] = {}; + } + params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys); + } +}; + +DB::Blocks GraceMergingAggregatedTransform::scatterBlock(const DB::Block & block) +{ + if (!block.rows()) + return {}; + Stopwatch watch; + size_t bucket_num = getBucketsNum(); + if (static_cast(block.info.bucket_num) == bucket_num) + return {block}; + auto blocks = DB::JoinCommon::scatterBlockByHash(params->params.keys, block, bucket_num); + for (auto & new_block : blocks) + { + new_block.info.bucket_num = static_cast(bucket_num); + } + total_scatter_time += watch.elapsedMilliseconds(); + return blocks; +} + +void GraceMergingAggregatedTransform::addBlockIntoFileBucket(size_t bucket_index, const DB::Block & block) +{ + if (!block.rows()) + return; + auto & file_stream = buckets[bucket_index]; + file_stream.blocks.push_back(block); +} + +void GraceMergingAggregatedTransform::flushBuckets() +{ + auto before_mem = getMemoryUsage(); + size_t flush_bytes = 0; + Stopwatch watch; + for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) + flush_bytes += flushBucket(i); + total_spill_disk_time += watch.elapsedMilliseconds(); + total_spill_disk_bytes += flush_bytes; + LOG_INFO(logger, "flush {} in {} ms, memoery usage: {} -> {}", ReadableSize(flush_bytes), watch.elapsedMilliseconds(), ReadableSize(before_mem), ReadableSize(getMemoryUsage())); +} + +size_t GraceMergingAggregatedTransform::flushBucket(size_t bucket_index) +{ + auto & file_stream = buckets[bucket_index]; + if (file_stream.blocks.empty()) + return 0; + if (!file_stream.file_stream) + file_stream.file_stream = &tmp_data_disk->createStream(header); + DB::Blocks blocks; + size_t flush_bytes = 0; + while (!file_stream.blocks.empty()) + { + while (!file_stream.blocks.empty()) + { + if (!blocks.empty() && blocks.back().info.bucket_num != file_stream.blocks.front().info.bucket_num) + break; + blocks.push_back(file_stream.blocks.front()); + file_stream.blocks.pop_front(); + } + auto bucket = blocks.front().info.bucket_num; + auto merged_block = DB::concatenateBlocks(blocks); + merged_block.info.bucket_num = bucket; + blocks.clear(); + flush_bytes += merged_block.bytes(); + if (merged_block.rows()) + { + file_stream.file_stream->write(merged_block); + } + } + return flush_bytes; +} + +void GraceMergingAggregatedTransform::prepareBucketOutputBlocks() +{ + size_t read_bytes = 0; + size_t read_rows = 0; + Stopwatch watch; + if (!current_data_variants) + { + current_data_variants = std::make_shared(); + no_more_keys = false; + } + auto & buffer_file_stream = buckets[current_bucket_index]; + + if (buffer_file_stream.file_stream) + { + buffer_file_stream.file_stream->finishWriting(); + while (true) + { + auto block = buffer_file_stream.file_stream->read(); + if (!block.rows()) + break; + read_bytes += block.bytes(); + read_rows += block.rows(); + mergeOneBlock(block); + block = {}; + } + buffer_file_stream.file_stream = nullptr; + total_read_disk_time += watch.elapsedMilliseconds(); + } + if (!buffer_file_stream.blocks.empty()) + { + for (auto & block : buffer_file_stream.blocks) + { + mergeOneBlock(block); + block = {}; + } + } + current_final_blocks = params->aggregator.convertToBlocks(*current_data_variants, true, 1); + LOG_INFO(logger, "prepare to output bucket {}, read bytes: {}, read rows: {}, time: {} ms", current_bucket_index, ReadableSize(read_bytes), read_rows, watch.elapsedMilliseconds()); +} + +void GraceMergingAggregatedTransform::mergeOneBlock(const DB::Block &block) +{ + if (!block.rows()) + return; + + if (isMemoryOverFlow()) + flushBuckets(); + + if (isMemoryOverFlow()) + { + extendBuckets(); + rehashDataVariants(); + } + + LOG_TRACE( + logger, + "merge on block, rows: {}, bytes:{}, bucket: {}. current bucket: {}, total bucket: {}, mem used: {}", + block.rows(), + ReadableSize(block.bytes()), + block.info.bucket_num, + current_bucket_index, + getBucketsNum(), + ReadableSize(getMemoryUsage())); + + if (block.info.bucket_num == static_cast(getBucketsNum()) || getBucketsNum() == 1) + { + params->aggregator.mergeOnBlock(block, *current_data_variants, no_more_keys); + } + else + { + auto scattered_blocks = scatterBlock(block); + for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) + { + addBlockIntoFileBucket(i, scattered_blocks[i]); + } + params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys); + } +} + +size_t GraceMergingAggregatedTransform::getMemoryUsage() +{ + size_t current_memory_usage = 0; + if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) + if (auto * memory_tracker = memory_tracker_child->getParent()) + current_memory_usage = memory_tracker->get(); + return current_memory_usage; +} + +bool GraceMergingAggregatedTransform::isMemoryOverFlow() +{ + auto mem_limit = context->getSettingsRef().max_bytes_before_external_group_by; + if (!mem_limit) + return false; + auto current_memory_usage = getMemoryUsage(); + if (current_memory_usage > mem_limit) + { + LOG_INFO(logger, "memory is overflow. used: {}, limit: {}", ReadableSize(current_memory_usage), ReadableSize(mem_limit)); + return true; + } + return false; +} +} diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h new file mode 100644 index 0000000000000..71619476ce4d9 --- /dev/null +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class GraceMergingAggregatedStep : public DB::ITransformingStep +{ +public: + explicit GraceMergingAggregatedStep( + DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_); + ~GraceMergingAggregatedStep() override = default; + + String getName() const override { return "GraceMergingAggregatedStep"; } + + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; + + void describeActions(DB::JSONBuilder::JSONMap & map) const override; + void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; +private: + DB::ContextPtr context; + DB::Aggregator::Params params; + void updateOutputStream() override; +}; + +class GraceMergingAggregatedTransform : public DB::IProcessor +{ +public: + static constexpr size_t max_buckets = 32; + using Status = DB::IProcessor::Status; + explicit GraceMergingAggregatedTransform(const DB::Block &header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_); + ~GraceMergingAggregatedTransform() override; + + Status prepare() override; + void work() override; + String getName() const override { return "GraceMergingAggregatedTransform"; } +private: + DB::Block header; + DB::AggregatingTransformParamsPtr params; + DB::ContextPtr context; + DB::TemporaryDataOnDiskPtr tmp_data_disk; + DB::AggregatedDataVariantsPtr current_data_variants = nullptr; + size_t current_bucket_index = 0; + + struct BufferFileStream + { + std::list blocks; + DB::TemporaryFileStream * file_stream = nullptr; + }; + std::unordered_map buckets; + + size_t getBucketsNum() const { return buckets.size(); } + void extendBuckets(); + void rehashDataVariants(); + DB::Blocks scatterBlock(const DB::Block & block); + void addBlockIntoFileBucket(size_t bucket_index, const DB::Block & block); + void flushBuckets(); + size_t flushBucket(size_t bucket_index); + void prepareBucketOutputBlocks(); + void mergeOneBlock(const DB::Block &block); + size_t getMemoryUsage(); + bool isMemoryOverFlow(); + + bool input_finished = false; + bool has_input = false; + DB::Chunk input_chunk; + bool has_output = false; + DB::Chunk output_chunk; + DB::BlocksList current_final_blocks; + bool no_more_keys = false; + + // metrics + size_t total_input_blocks = 0; + size_t total_input_rows = 0; + size_t total_output_blocks = 0; + size_t total_output_rows = 0; + size_t total_spill_disk_bytes = 0; + size_t total_spill_disk_time = 0; + size_t total_read_disk_time = 0; + size_t total_scatter_time = 0; + + Poco::Logger * logger = &Poco::Logger::get("GraceMergingAggregatedTransform"); +}; +} diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp new file mode 100644 index 0000000000000..a0dfd27401aa0 --- /dev/null +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "StreamingAggregatingStep.h" +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +StreamingAggregatingTransform::StreamingAggregatingTransform(DB::ContextPtr context_, const DB::Block &header_, DB::AggregatingTransformParamsPtr params_) + : DB::IProcessor({header_}, {params_->getHeader()}) + , context(context_) + , header(header_) + , key_columns(params_->params.keys_size) + , aggregate_columns(params_->params.aggregates_size) + , params(params_) +{ + params = std::make_shared(); + variants = std::make_shared(); +} + +StreamingAggregatingTransform::~StreamingAggregatingTransform() +{ + LOG_ERROR(logger, "xxx input rows: {}, output rows: {}", input_rows, output_rows); +} + +StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.front(); + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + if (has_output) + { + if (output.canPush()) + { + output_rows += output_chunk.getNumRows(); + output.push(std::move(output_chunk)); + has_output = false; + } + return Status::PortFull; + } + + if (has_input) + return Status::Ready; + + if (is_consume_finished) + { + input.close(); + output.finish(); + return Status::Finished; + } + + if (input.isFinished()) + { + is_input_finished = true; + if (is_clear_aggregator) + { + output.finish(); + return Status::Finished; + } + else + { + return Status::Ready; + } + } + input.setNeeded(); + if (!input.hasData()) + { + return Status::NeedData; + } + input_chunk = input.pull(true); + input_rows += input_chunk.getNumRows(); + LOG_ERROR(logger, "xxx input one chunk. rows: {}", input_chunk.getNumRows()); + has_input = true; + return Status::Ready; +} + +static UInt64 getMemoryUsage() +{ + Int64 current_memory_usage = 0; + if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) + if (auto * memory_tracker = memory_tracker_child->getParent()) + current_memory_usage = memory_tracker->get(); + return current_memory_usage; +} + +template +static Int32 getAggregatedDataVariantsBucketNum(Method & method [[maybe_unused]]) +{ + return Method::Data::NUM_BUCKETS; +} + +bool StreamingAggregatingTransform::isMemoryOverFlow() +{ + return getMemoryUsage() > context->getSettingsRef().max_bytes_before_external_group_by; +} + +void StreamingAggregatingTransform::work() +{ + if (has_input) + { + if (pending_buckets > 0) + { + if (input_chunk.hasRows()) + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "input chunk must be empty here, but got: {}", input_chunk.dumpStructure()); + } + output_chunk = DB::convertToChunk(params->aggregator.convertOneBucketToBlock(*variants, variants->aggregates_pool, false, pending_buckets - 1)); + has_output = true; + total_data_variants_rows += output_chunk.getNumRows(); + total_data_variants_bytes += output_chunk.bytes(); + LOG_ERROR(logger, "xxx flush one bucket. rows: {}, bytes: {}, bucket: {}", output_chunk.getNumRows(), ReadableSize(output_chunk.bytes()), pending_buckets - 1); + + pending_buckets -= 1; + if (pending_buckets <= 0) + { + auto mem1 = getMemoryUsage(); + is_clear_aggregator = true; + variants = std::make_shared(); + LOG_ERROR(logger, "xxx flush one variant. rows: {}, bytes: {}, mem used: {} -> {}", total_data_variants_rows, ReadableSize(total_data_variants_bytes), ReadableSize(mem1), ReadableSize(getMemoryUsage())); + total_data_variants_rows = 0; + total_data_variants_bytes = 0; + } + has_input = pending_buckets > 0; + } + else + { + const UInt64 num_rows = input_chunk.getNumRows(); + bool need_to_evict_aggregator = false; + try + { + is_clear_aggregator = false; + if (!params->aggregator.executeOnBlock( + input_chunk.detachColumns(), 0, num_rows, *variants, key_columns, aggregate_columns, no_more_keys)) + { + is_consume_finished = true; + need_to_evict_aggregator = true; + } + LOG_ERROR(logger, "xxxx mem used: {} after merging one chunk", ReadableSize(getMemoryUsage())); + input_chunk = {}; + } + catch (DB::Exception & e) + { + if (e.code() == DB::ErrorCodes::LOGICAL_ERROR + && e.message() == "Cannot write to temporary file because temporary file is not initialized") + { + + LOG_ERROR(logger, "xxx exception on spill data into disk. mem limit: {}", ReadableSize(params->params.max_bytes_before_external_group_by)); + need_to_evict_aggregator = true; + input_chunk = {}; + } + else + { + LOG_ERROR(logger, "xxx unknow exception: {}", e.message()); + throw; + } + } + if (need_to_evict_aggregator) + { + LOG_ERROR(logger, "xxx 2. prepare to clear data variants. size: {}", variants->size()); + if (variants->isTwoLevel()) + { + prepareFlushOneTwoLevelDataVariants(); + } + else + { + auto block = params->aggregator.convertToSingleBlock(*variants, false); + output_chunk = DB::convertToChunk(block); + has_output = true; + has_input = false; + variants = std::make_shared(); + is_clear_aggregator = true; + } + } + } + has_input = pending_buckets > 0; + } + else + { + if (!is_input_finished) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "input should be finished"); + pending_blocks = params->aggregator.convertToBlocks(*data_variants, false, 1); + data_variants = std::make_shared(); + } +} + +void StreamingAggregatingTransform::prepareFlushOneTwoLevelDataVariants() +{ +#define M(NAME) \ + else if (variants->type == DB::AggregatedDataVariants::Type::NAME) \ + { \ + pending_buckets = getAggregatedDataVariantsBucketNum(*(variants->NAME)); \ + } + + if (false) {} // NOLINT + APPLY_FOR_VARIANTS_TWO_LEVEL(M) +#undef M + has_output = false; + has_input = pending_buckets > 0; +} + +static DB::ITransformingStep::Traits getTraits() +{ + return DB::ITransformingStep::Traits + { + { + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +StreamingAggregatingStep::StreamingAggregatingStep( + DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_) + : DB::ITransformingStep( + input_stream_, params_.getHeader(input_stream_.header, false), getTraits()) + , context(context_) + , params(std::move(params_)) +{ +} + +void StreamingAggregatingStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) +{ + pipeline.dropTotalsAndExtremes(); + auto transform_params = std::make_shared(pipeline.getHeader(), params, false); + pipeline.resize(1); + auto build_transform = [&](DB::OutputPortRawPtrs outputs) + { + DB::Processors new_processors; + for (auto & output : outputs) + { + auto op = std::make_shared(context, pipeline.getHeader(), transform_params); + new_processors.push_back(op); + DB::connect(*output, op->getInputs().front()); + } + return new_processors; + }; + pipeline.transform(build_transform); +} + +void StreamingAggregatingStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const +{ + return params.explain(settings.out, settings.offset); +} + +void StreamingAggregatingStep::describeActions(DB::JSONBuilder::JSONMap & map) const +{ + params.explain(map); +} + +void StreamingAggregatingStep::updateOutputStream() +{ + output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, true), getDataStreamTraits()); +} + +} diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h new file mode 100644 index 0000000000000..9b7493bf36cd1 --- /dev/null +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class StreamingAggregatingTransform : public DB::IProcessor +{ +public: + using Status = DB::IProcessor::Status; + explicit StreamingAggregatingTransform(DB::ContextPtr context_, const DB::Block &header_, DB::AggregatingTransformParamsPtr params_); + ~StreamingAggregatingTransform() override; + String getName() const override { return "StreamingAggregatingTransform"; } + Status prepare() override; + void work() override; +private: + DB::ContextPtr context; + DB::Block header; + DB::ColumnRawPtrs key_columns; + DB::Aggregator::AggregateColumns aggregate_columns; + DB::AggregatingTransformParamsPtr params; + + bool no_more_keys = false; + bool is_consume_finished = false; + bool is_input_finished = false; + bool is_clear_aggregator = false; + DB::AggregatedDataVariantsPtr data_variants; + bool has_input = false; + bool has_output = false; + DB::Chunk input_chunk; + DB::Chunk output_chunk; + + DB::BlocksList pending_blocks; + Int32 pending_buckets = 0; + size_t total_data_variants_rows = 0; + size_t total_data_variants_bytes = 0; + Poco::Logger * logger = &Poco::Logger::get("StreamingAggregatingTransform"); + size_t input_rows = 0; + size_t output_rows = 0; + + void prepareFlushOneTwoLevelDataVariants(); + bool isMemoryOverFlow(); + +}; + +class StreamingAggregatingStep : public DB::ITransformingStep +{ +public: + explicit StreamingAggregatingStep( + DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_); + ~StreamingAggregatingStep() override = default; + + String getName() const override { return "StreamingAggregating"; } + + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; + + void describeActions(DB::JSONBuilder::JSONMap & map) const override; + void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; + +private: + DB::ContextPtr context; + DB::Aggregator::Params params; + void updateOutputStream() override; +}; +}