diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp index 3ef616b6ce0f7..83c37c7ad7524 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp @@ -16,7 +16,7 @@ */ #include "StorageJoinFromReadBuffer.h" -#include +#include #include #include #include @@ -42,7 +42,7 @@ using namespace DB; void restore(DB::ReadBuffer & in, IJoin & join, const Block & sample_block) { - NativeReader block_stream(in, 0); + local_engine::NativeReader block_stream(in); ProfileInfo info; { diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp index 1ddd5311f90e5..ad04c07000a3a 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp @@ -44,7 +44,6 @@ void NativeSplitter::split(DB::Block & block) { return; } - block = convertAggregateStateInBlock(block); if (!output_header.columns()) [[unlikely]] { if (output_columns_indicies.empty()) diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp index 957644a01ed01..0bf035a263dc5 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp @@ -39,6 +39,7 @@ void ShuffleSplitter::split(DB::Block & block) { return; } + initOutputIfNeeded(block); computeAndCountPartitionId(block); Stopwatch split_time_watch; split_time_watch.start(); @@ -74,12 +75,12 @@ SplitResult ShuffleSplitter::stop() stopped = true; return split_result; } -void ShuffleSplitter::splitBlockByPartition(DB::Block & block) + +void ShuffleSplitter::initOutputIfNeeded(Block & block) { - Stopwatch split_time_watch; - split_time_watch.start(); - if (!output_header.columns()) [[unlikely]] + if (output_header.columns() == 0) [[unlikely]] { + output_header = block.cloneEmpty(); if (output_columns_indicies.empty()) { output_header = block.cloneEmpty(); @@ -90,7 +91,7 @@ void ShuffleSplitter::splitBlockByPartition(DB::Block & block) } else { - DB::ColumnsWithTypeAndName cols; + ColumnsWithTypeAndName cols; for (const auto & index : output_columns_indicies) { cols.push_back(block.getByPosition(index)); @@ -98,6 +99,12 @@ void ShuffleSplitter::splitBlockByPartition(DB::Block & block) output_header = DB::Block(cols); } } +} + +void ShuffleSplitter::splitBlockByPartition(DB::Block & block) +{ + Stopwatch split_time_watch; + split_time_watch.start(); DB::Block out_block; for (size_t col = 0; col < output_header.columns(); ++col) { @@ -152,7 +159,7 @@ void ShuffleSplitter::spillPartition(size_t partition_id) { partition_write_buffers[partition_id] = getPartitionWriteBuffer(partition_id); partition_outputs[partition_id] - = std::make_unique(*partition_write_buffers[partition_id], 0, partition_buffer[partition_id].getHeader()); + = std::make_unique(*partition_write_buffers[partition_id], output_header); } DB::Block result = partition_buffer[partition_id].releaseColumns(); if (result.rows() > 0) diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index 49fa967fc03eb..611c61702d4eb 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include @@ -101,6 +101,7 @@ class ShuffleSplitter : public ShuffleWriterBase private: void init(); + void initOutputIfNeeded(DB::Block & block); void splitBlockByPartition(DB::Block & block); void spillPartition(size_t partition_id); std::string getPartitionTempFile(size_t partition_id); @@ -111,7 +112,7 @@ class ShuffleSplitter : public ShuffleWriterBase bool stopped = false; PartitionInfo partition_info; std::vector partition_buffer; - std::vector> partition_outputs; + std::vector> partition_outputs; std::vector> partition_write_buffers; std::vector> partition_cached_write_buffers; std::vector compressed_buffers; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp index 8fdbac37fd603..dddf0b895fdff 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp @@ -41,11 +41,11 @@ void ShuffleWriter::write(const Block & block) { if (compression_enable) { - native_writer = std::make_unique(*compressed_out, 0, block.cloneEmpty()); + native_writer = std::make_unique(*compressed_out, block.cloneEmpty()); } else { - native_writer = std::make_unique(*write_buffer, 0, block.cloneEmpty()); + native_writer = std::make_unique(*write_buffer, block.cloneEmpty()); } } if (block.rows() > 0) diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h index 459bf4e93ad75..98f67d1ccadb8 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h @@ -16,7 +16,7 @@ */ #pragma once #include -#include +#include namespace local_engine { @@ -32,7 +32,7 @@ class ShuffleWriter private: std::unique_ptr compressed_out; std::unique_ptr write_buffer; - std::unique_ptr native_writer; + std::unique_ptr native_writer; bool compression_enable; }; } diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp index 6080e8486b5a0..f0c47b9b1f5e6 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB { @@ -120,7 +121,14 @@ Block NativeReader::read() /// Type String type_name; readBinary(type_name, istr); - column.type = data_type_factory.get(type_name); + bool agg_opt_column = false; + String real_type_name = type_name; + if (type_name.ends_with(NativeWriter::AGG_STATE_SUFFIX)) + { + agg_opt_column = true; + real_type_name = type_name.substr(0, type_name.length() - NativeWriter::AGG_STATE_SUFFIX.length()); + } + column.type = data_type_factory.get(real_type_name); bool is_agg_state_type = isAggregateFunction(column.type); SerializationPtr serialization = column.type->getDefaultSerialization(); @@ -130,7 +138,7 @@ Block NativeReader::read() double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i]; if (rows) /// If no rows, nothing to read. { - if (is_agg_state_type) + if (is_agg_state_type && agg_opt_column) { const DataTypeAggregateFunction * agg_type = checkAndGetDataType(column.type.get()); readAggData(*agg_type, read_column, istr, rows); diff --git a/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp b/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp index 2cbdad0ae36be..1c24f4c1a5e2e 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp +++ b/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp @@ -24,6 +24,8 @@ using namespace DB; namespace local_engine { + +const String NativeWriter::AGG_STATE_SUFFIX= "#optagg"; void NativeWriter::flush() { ostr.next(); @@ -67,8 +69,15 @@ size_t NativeWriter::write(const DB::Block & block) auto original_type = header.safeGetByPosition(i).type; /// Type String type_name = original_type->getName(); - - writeStringBinary(type_name, ostr); + if (isAggregateFunction(original_type) + && header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType()) + { + writeStringBinary(type_name + AGG_STATE_SUFFIX, ostr); + } + else + { + writeStringBinary(type_name, ostr); + } SerializationPtr serialization = column.type->getDefaultSerialization(); column.column = recursiveRemoveSparse(column.column); diff --git a/cpp-ch/local-engine/Storages/IO/NativeWriter.h b/cpp-ch/local-engine/Storages/IO/NativeWriter.h index 6815d89d25ddf..a958f4484dce4 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeWriter.h +++ b/cpp-ch/local-engine/Storages/IO/NativeWriter.h @@ -32,12 +32,12 @@ namespace local_engine class NativeWriter { public: + static const String AGG_STATE_SUFFIX; NativeWriter( DB::WriteBuffer & ostr_, const DB::Block & header_): ostr(ostr_), header(header_) {} DB::Block getHeader() const { return header; } - /// Returns the number of bytes written. size_t write(const DB::Block & block); void flush();