diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index c933c45966412..932917362b20f 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -310,9 +310,11 @@ size_t Partition::spill(NativeWriter & writer) if (lock.owns_lock()) { size_t raw_size = 0; - for (const auto & block : blocks) + while (!blocks.empty()) { + auto & block = blocks.back(); raw_size += writer.write(block); + blocks.pop_back(); } blocks.clear(); return raw_size; diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp index 6ad5bcefc6253..84c32f4565f75 100644 --- a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp @@ -36,23 +36,23 @@ bool isFixedSizeStateAggregateFunction(const String& name) return function_set.contains(name); } -bool isFixedSizeArguments(DataTypes data_types) +bool isFixedSizeArguments(const DataTypes& data_types) { - return data_types.front()->isValueRepresentedByNumber(); + return removeNullable(data_types.front())->isValueRepresentedByNumber(); } -bool isFixedSizeAggregateFunction(DB::AggregateFunctionPtr function) +bool isFixedSizeAggregateFunction(const DB::AggregateFunctionPtr& function) { return isFixedSizeStateAggregateFunction(function->getName()) && isFixedSizeArguments(function->getArgumentTypes()); } -DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col) +DB::ColumnWithTypeAndName convertAggregateStateToFixedString(const DB::ColumnWithTypeAndName& col) { - if (!WhichDataType(col.type).isAggregateFunction()) + const auto *aggregate_col = checkAndGetColumn(*col.column); + if (!aggregate_col) { return col; } - const auto *aggregate_col = checkAndGetColumn(*col.column); // only support known fixed size aggregate function if (!isFixedSizeAggregateFunction(aggregate_col->getAggregateFunction())) { @@ -70,17 +70,16 @@ DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeA return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name); } -DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col) +DB::ColumnWithTypeAndName convertAggregateStateToString(const DB::ColumnWithTypeAndName& col) { - if (!WhichDataType(col.type).isAggregateFunction()) + const auto *aggregate_col = checkAndGetColumn(*col.column); + if (!aggregate_col) { return col; } - const auto *aggregate_col = checkAndGetColumn(*col.column); auto res_type = std::make_shared(); auto res_col = res_type->createColumn(); PaddedPODArray & column_chars = assert_cast(*res_col).getChars(); - column_chars.reserve(aggregate_col->size() * 60); IColumn::Offsets & column_offsets = assert_cast(*res_col).getOffsets(); auto value_writer = WriteBufferFromVector>(column_chars); column_offsets.reserve(aggregate_col->size()); @@ -93,7 +92,7 @@ DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndNam return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name); } -DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type) +DB::ColumnWithTypeAndName convertFixedStringToAggregateState(const DB::ColumnWithTypeAndName & col, const DB::DataTypePtr & type) { chassert(WhichDataType(type).isAggregateFunction()); auto res_col = type->createColumn(); @@ -101,7 +100,6 @@ DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeA ColumnAggregateFunction & real_column = typeid_cast(*res_col); auto & arena = real_column.createOrGetArena(); ColumnAggregateFunction::Container & vec = real_column.getData(); - vec.reserve(col.column->size()); auto agg_function = agg_type->getFunction(); size_t size_of_state = agg_function->sizeOfData(); @@ -120,9 +118,10 @@ DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeA } return DB::ColumnWithTypeAndName(std::move(res_col), type, col.name); } -DB::Block convertAggregateStateInBlock(DB::Block block) +DB::Block convertAggregateStateInBlock(DB::Block& block) { ColumnsWithTypeAndName columns; + columns.reserve(block.columns()); for (const auto & item : block.getColumnsWithTypeAndName()) { if (WhichDataType(item.type).isAggregateFunction()) diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h index 6df1ad2821d2e..6536982ef5728 100644 --- a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h @@ -21,15 +21,15 @@ namespace local_engine { -bool isFixedSizeAggregateFunction(DB::AggregateFunctionPtr function); +bool isFixedSizeAggregateFunction(const DB::AggregateFunctionPtr & function); -DB::Block convertAggregateStateInBlock(DB::Block block); +DB::Block convertAggregateStateInBlock(DB::Block& block); -DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col); +DB::ColumnWithTypeAndName convertAggregateStateToFixedString(const DB::ColumnWithTypeAndName & col); -DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col); +DB::ColumnWithTypeAndName convertAggregateStateToString(const DB::ColumnWithTypeAndName & col); -DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type); +DB::ColumnWithTypeAndName convertFixedStringToAggregateState(const DB::ColumnWithTypeAndName & col, const DB::DataTypePtr & type); }