diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 3623e493bf48d..20f47d869ec24 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -303,7 +303,7 @@ void Partition::clear() blocks.clear(); } -size_t Partition::spill(DB::NativeWriter & writer) +size_t Partition::spill(NativeWriter & writer) { std::unique_lock lock(mtx, std::try_to_lock); if (lock.owns_lock()) diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index a0d83c194b05e..0a457e39415e7 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -48,7 +48,7 @@ class Partition void addBlock(DB::Block & block); bool empty() const; void clear(); - size_t spill(DB::NativeWriter & writer); + size_t spill(NativeWriter & writer); private: std::vector blocks; diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp index d53d565c42e72..49b8f274ad2a9 100644 --- a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp @@ -19,8 +19,10 @@ #include #include +#include #include #include +#include using namespace DB; @@ -58,6 +60,30 @@ DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeA } return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name); } + +DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col) +{ + if (!isAggregateFunction(col.type)) + { + return col; + } + const auto *aggregate_col = checkAndGetColumn(*col.column); + auto res_type = std::make_shared(); + auto res_col = res_type->createColumn(); + PaddedPODArray & column_chars = assert_cast(*res_col).getChars(); + column_chars.reserve(aggregate_col->size() * 60); + IColumn::Offsets & column_offsets = assert_cast(*res_col).getOffsets(); + auto value_writer = WriteBufferFromVector>(column_chars); + column_offsets.reserve(aggregate_col->size()); + for (const auto & item : aggregate_col->getData()) + { + aggregate_col->getAggregateFunction()->serialize(item, value_writer); + writeChar('\0', value_writer); + column_offsets.emplace_back(value_writer.count()); + } + return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name); +} + DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type) { chassert(isAggregateFunction(type)); @@ -90,7 +116,7 @@ DB::Block convertAggregateStateInBlock(DB::Block block) ColumnsWithTypeAndName columns; for (const auto & item : block.getColumnsWithTypeAndName()) { - columns.emplace_back(convertAggregateStateToFixedString(item)); + columns.emplace_back(convertAggregateStateToString(item)); } return columns; } diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h index 62d5127d8a90f..fb9bd4ae5e876 100644 --- a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h @@ -24,6 +24,8 @@ DB::Block convertAggregateStateInBlock(DB::Block block); DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col); +DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col); + DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type); } diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp index f0c47b9b1f5e6..cf6d11f1695f9 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp @@ -73,9 +73,10 @@ void NativeReader::readAggData(const DB::DataTypeAggregateFunction & data_type, AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state); agg_function->create(place); - - auto n = istr.read(place, size_of_state); - chassert(n == size_of_state); +// UInt64 size; +// readVarUInt(size, istr); + agg_function->deserialize(place, istr); + istr.ignore(); vec.push_back(place); } } diff --git a/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp b/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp index 1c24f4c1a5e2e..fd7d11fd54b6e 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp +++ b/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp @@ -19,6 +19,8 @@ #include #include #include +#include + using namespace DB; @@ -69,8 +71,9 @@ size_t NativeWriter::write(const DB::Block & block) auto original_type = header.safeGetByPosition(i).type; /// Type String type_name = original_type->getName(); - if (isAggregateFunction(original_type) - && header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType()) + bool is_agg_opt = isAggregateFunction(original_type) + && header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType(); + if (is_agg_opt) { writeStringBinary(type_name + AGG_STATE_SUFFIX, ostr); } @@ -83,7 +86,18 @@ size_t NativeWriter::write(const DB::Block & block) column.column = recursiveRemoveSparse(column.column); /// Data if (rows) /// Zero items of data is always represented as zero number of bytes. - writeData(*serialization, column.column, ostr, 0, 0); + { + if (is_agg_opt) + { + const auto * str_col = static_cast(column.column.get()); + const PaddedPODArray & column_chars = str_col->getChars(); + ostr.write(column_chars.raw_data(), str_col->getOffsets().back()); + } + else + { + writeData(*serialization, column.column, ostr, 0, 0); + } + } } size_t written_after = ostr.count();