diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 38192a32b465..0814c3c8c7d7 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -290,4 +290,44 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite } } } + + test("collect_set") { + val sql = + """ + |select a, b from ( + |select n_regionkey as a, collect_set(if(n_regionkey=0, n_name, null)) + | as set from nation group by n_regionkey) + |lateral view explode(set) as b + |order by a, b + |""".stripMargin + runQueryAndCompare(sql)(checkOperatorMatch[CHHashAggregateExecTransformer]) + } + + test("test 'aggregate function collect_list'") { + val df = runQueryAndCompare( + "select l_orderkey,from_unixtime(l_orderkey, 'yyyy-MM-dd HH:mm:ss') " + + "from lineitem order by l_orderkey desc limit 10" + )(checkOperatorMatch[ProjectExecTransformer]) + checkLengthAndPlan(df, 10) + } + + test("test max string") { + withSQLConf(("spark.gluten.sql.columnar.force.hashagg", "true")) { + val sql = + """ + |SELECT + | l_returnflag, + | l_linestatus, + | max(l_comment) + |FROM + | lineitem + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |""".stripMargin + runQueryAndCompare(sql, noFallBack = false) { df => } + } + } } diff --git a/cpp-ch/local-engine/Builder/SerializedPlanBuilder.cpp b/cpp-ch/local-engine/Builder/SerializedPlanBuilder.cpp index 22499794a500..92e5c564110d 100644 --- a/cpp-ch/local-engine/Builder/SerializedPlanBuilder.cpp +++ b/cpp-ch/local-engine/Builder/SerializedPlanBuilder.cpp @@ -259,7 +259,7 @@ std::shared_ptr SerializedPlanBuilder::buildType(const DB::Data res->mutable_i32()->set_nullability(type_nullability); else if (which.isInt64()) res->mutable_i64()->set_nullability(type_nullability); - else if (which.isString() || which.isAggregateFunction()) + else if (which.isStringOrFixedString() || which.isAggregateFunction()) res->mutable_binary()->set_nullability(type_nullability); /// Spark Binary type is more similiar to CH String type else if (which.isFloat32()) res->mutable_fp32()->set_nullability(type_nullability); diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp index 3ef616b6ce0f..83c37c7ad752 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp @@ -16,7 +16,7 @@ */ #include "StorageJoinFromReadBuffer.h" -#include +#include #include #include #include @@ -42,7 +42,7 @@ using namespace DB; void restore(DB::ReadBuffer & in, IJoin & join, const Block & sample_block) { - NativeReader block_stream(in, 0); + local_engine::NativeReader block_stream(in); ProfileInfo info; { diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 676231e048b9..16d56e9bb8e8 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -17,6 +17,7 @@ #include "CachedShuffleWriter.h" #include #include +#include #include #include #include @@ -94,6 +95,10 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions void CachedShuffleWriter::split(DB::Block & block) { initOutputIfNeeded(block); + Stopwatch split_time_watch; + split_time_watch.start(); + block = convertAggregateStateInBlock(block); + split_result.total_split_time += split_time_watch.elapsedNanoseconds(); Stopwatch compute_pid_time_watch; compute_pid_time_watch.start(); @@ -105,7 +110,6 @@ void CachedShuffleWriter::split(DB::Block & block) { out_block.insert(block.getByPosition(output_columns_indicies[col])); } - partition_writer->write(partition_info, out_block); if (options.spill_threshold > 0 && partition_writer->totalCacheSize() > options.spill_threshold) diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp index eec4e05fffb9..d05f6633ca8a 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp @@ -30,6 +30,7 @@ #include #include #include +#include namespace local_engine { diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 7a6bcb78ccf1..932917362b20 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -29,6 +29,7 @@ #include #include #include +#include using namespace DB; @@ -56,7 +57,7 @@ void local_engine::PartitionWriter::write(const PartitionInfo& partition_info, D if (buffer.size() >= shuffle_writer->options.split_size) { Block block = buffer.releaseColumns(); - auto bytes = block.bytes(); + auto bytes = block.allocatedBytes(); total_partition_buffer_size += bytes; shuffle_writer->split_result.raw_partition_length[i] += bytes; partition_buffer[i].addBlock(block); @@ -73,7 +74,7 @@ void LocalPartitionWriter::evictPartitions(bool for_memory_spill) WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, 0, shuffle_writer->output_header); + NativeWriter writer(compressed_output, shuffle_writer->output_header); SpillInfo info; info.spilled_file = file; size_t partition_id = 0; @@ -122,7 +123,7 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) { auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, 0, shuffle_writer->output_header); + NativeWriter writer(compressed_output, shuffle_writer->output_header); std::vector partition_length; partition_length.resize(shuffle_writer->options.partition_nums, 0); @@ -130,7 +131,8 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) spill_inputs.reserve(spill_infos.size()); for (const auto & spill : spill_infos) { - spill_inputs.emplace_back(std::make_shared(spill.spilled_file, shuffle_writer->options.io_buffer_size)); + // only use readBig + spill_inputs.emplace_back(std::make_shared(spill.spilled_file, 0)); } Stopwatch write_time_watch; @@ -229,7 +231,7 @@ void CelebornPartitionWriter::evictPartitions(bool for_memory_spill) WriteBufferFromOwnString output; auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, 0, shuffle_writer->output_header); + NativeWriter writer(compressed_output, shuffle_writer->output_header); size_t raw_size = partition.spill(writer); compressed_output.sync(); Stopwatch push_time_watch; @@ -302,15 +304,17 @@ void Partition::clear() blocks.clear(); } -size_t Partition::spill(DB::NativeWriter & writer) +size_t Partition::spill(NativeWriter & writer) { std::unique_lock lock(mtx, std::try_to_lock); if (lock.owns_lock()) { size_t raw_size = 0; - for (const auto & block : blocks) + while (!blocks.empty()) { + auto & block = blocks.back(); raw_size += writer.write(block); + blocks.pop_back(); } blocks.clear(); return raw_size; diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index a0d83c194b05..0a457e39415e 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -48,7 +48,7 @@ class Partition void addBlock(DB::Block & block); bool empty() const; void clear(); - size_t spill(DB::NativeWriter & writer); + size_t spill(NativeWriter & writer); private: std::vector blocks; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp index 32c9e4cecf08..50165ca6661a 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp @@ -37,11 +37,11 @@ local_engine::ShuffleReader::ShuffleReader(std::unique_ptr in_, bool { compressed_in = std::make_unique(*in); configureCompressedReadBuffer(static_cast(*compressed_in)); - input_stream = std::make_unique(*compressed_in, 0); + input_stream = std::make_unique(*compressed_in); } else { - input_stream = std::make_unique(*in, 0); + input_stream = std::make_unique(*in); } } Block * local_engine::ShuffleReader::read() diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.h b/cpp-ch/local-engine/Shuffle/ShuffleReader.h index fccc2b0e5755..082e75a26ca6 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.h @@ -19,6 +19,7 @@ #include #include #include +#include namespace DB { @@ -42,7 +43,7 @@ class ShuffleReader : BlockIterator private: std::unique_ptr in; std::unique_ptr compressed_in; - std::unique_ptr input_stream; + std::unique_ptr input_stream; DB::Block header; }; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp index 266c343a59e7..e45b9d32218e 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -39,7 +39,12 @@ void ShuffleSplitter::split(DB::Block & block) { return; } + initOutputIfNeeded(block); computeAndCountPartitionId(block); + Stopwatch split_time_watch; + split_time_watch.start(); + block = convertAggregateStateInBlock(block); + split_result.total_split_time += split_time_watch.elapsedNanoseconds(); splitBlockByPartition(block); } SplitResult ShuffleSplitter::stop() @@ -70,12 +75,12 @@ SplitResult ShuffleSplitter::stop() stopped = true; return split_result; } -void ShuffleSplitter::splitBlockByPartition(DB::Block & block) + +void ShuffleSplitter::initOutputIfNeeded(Block & block) { - Stopwatch split_time_watch; - split_time_watch.start(); - if (!output_header.columns()) [[unlikely]] + if (output_header.columns() == 0) [[unlikely]] { + output_header = block.cloneEmpty(); if (output_columns_indicies.empty()) { output_header = block.cloneEmpty(); @@ -86,7 +91,7 @@ void ShuffleSplitter::splitBlockByPartition(DB::Block & block) } else { - DB::ColumnsWithTypeAndName cols; + ColumnsWithTypeAndName cols; for (const auto & index : output_columns_indicies) { cols.push_back(block.getByPosition(index)); @@ -94,6 +99,12 @@ void ShuffleSplitter::splitBlockByPartition(DB::Block & block) output_header = DB::Block(cols); } } +} + +void ShuffleSplitter::splitBlockByPartition(DB::Block & block) +{ + Stopwatch split_time_watch; + split_time_watch.start(); DB::Block out_block; for (size_t col = 0; col < output_header.columns(); ++col) { @@ -148,7 +159,7 @@ void ShuffleSplitter::spillPartition(size_t partition_id) { partition_write_buffers[partition_id] = getPartitionWriteBuffer(partition_id); partition_outputs[partition_id] - = std::make_unique(*partition_write_buffers[partition_id], 0, partition_buffer[partition_id].getHeader()); + = std::make_unique(*partition_write_buffers[partition_id], output_header); } DB::Block result = partition_buffer[partition_id].releaseColumns(); if (result.rows() > 0) diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index e9a59c339cae..aad53508b81b 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include @@ -101,6 +101,7 @@ class ShuffleSplitter : public ShuffleWriterBase private: void init(); + void initOutputIfNeeded(DB::Block & block); void splitBlockByPartition(DB::Block & block); void spillPartition(size_t partition_id); std::string getPartitionTempFile(size_t partition_id); @@ -111,7 +112,7 @@ class ShuffleSplitter : public ShuffleWriterBase bool stopped = false; PartitionInfo partition_info; std::vector partition_buffer; - std::vector> partition_outputs; + std::vector> partition_outputs; std::vector> partition_write_buffers; std::vector> partition_cached_write_buffers; std::vector compressed_buffers; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp index 8fdbac37fd60..dddf0b895fdf 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp @@ -41,11 +41,11 @@ void ShuffleWriter::write(const Block & block) { if (compression_enable) { - native_writer = std::make_unique(*compressed_out, 0, block.cloneEmpty()); + native_writer = std::make_unique(*compressed_out, block.cloneEmpty()); } else { - native_writer = std::make_unique(*write_buffer, 0, block.cloneEmpty()); + native_writer = std::make_unique(*write_buffer, block.cloneEmpty()); } } if (block.rows() > 0) diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h index 459bf4e93ad7..98f67d1ccadb 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h @@ -16,7 +16,7 @@ */ #pragma once #include -#include +#include namespace local_engine { @@ -32,7 +32,7 @@ class ShuffleWriter private: std::unique_ptr compressed_out; std::unique_ptr write_buffer; - std::unique_ptr native_writer; + std::unique_ptr native_writer; bool compression_enable; }; } diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp new file mode 100644 index 000000000000..84c32f4565f7 --- /dev/null +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AggregateSerializationUtils.h" +#include + +#include +#include +#include +#include +#include +#include + + +using namespace DB; + +namespace local_engine +{ + +bool isFixedSizeStateAggregateFunction(const String& name) +{ + static const std::set function_set = {"min", "max", "sum", "count", "avg"}; + return function_set.contains(name); +} + +bool isFixedSizeArguments(const DataTypes& data_types) +{ + return removeNullable(data_types.front())->isValueRepresentedByNumber(); +} + +bool isFixedSizeAggregateFunction(const DB::AggregateFunctionPtr& function) +{ + return isFixedSizeStateAggregateFunction(function->getName()) && isFixedSizeArguments(function->getArgumentTypes()); +} + +DB::ColumnWithTypeAndName convertAggregateStateToFixedString(const DB::ColumnWithTypeAndName& col) +{ + const auto *aggregate_col = checkAndGetColumn(*col.column); + if (!aggregate_col) + { + return col; + } + // only support known fixed size aggregate function + if (!isFixedSizeAggregateFunction(aggregate_col->getAggregateFunction())) + { + return col; + } + size_t state_size = aggregate_col->getAggregateFunction()->sizeOfData(); + auto res_type = std::make_shared(state_size); + auto res_col = res_type->createColumn(); + PaddedPODArray & column_chars_t = assert_cast(*res_col).getChars(); + column_chars_t.reserve(aggregate_col->size() * state_size); + for (const auto & item : aggregate_col->getData()) + { + column_chars_t.insert_assume_reserved(item, item + state_size); + } + return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name); +} + +DB::ColumnWithTypeAndName convertAggregateStateToString(const DB::ColumnWithTypeAndName& col) +{ + const auto *aggregate_col = checkAndGetColumn(*col.column); + if (!aggregate_col) + { + return col; + } + auto res_type = std::make_shared(); + auto res_col = res_type->createColumn(); + PaddedPODArray & column_chars = assert_cast(*res_col).getChars(); + IColumn::Offsets & column_offsets = assert_cast(*res_col).getOffsets(); + auto value_writer = WriteBufferFromVector>(column_chars); + column_offsets.reserve(aggregate_col->size()); + for (const auto & item : aggregate_col->getData()) + { + aggregate_col->getAggregateFunction()->serialize(item, value_writer); + writeChar('\0', value_writer); + column_offsets.emplace_back(value_writer.count()); + } + return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name); +} + +DB::ColumnWithTypeAndName convertFixedStringToAggregateState(const DB::ColumnWithTypeAndName & col, const DB::DataTypePtr & type) +{ + chassert(WhichDataType(type).isAggregateFunction()); + auto res_col = type->createColumn(); + const auto * agg_type = checkAndGetDataType(type.get()); + ColumnAggregateFunction & real_column = typeid_cast(*res_col); + auto & arena = real_column.createOrGetArena(); + ColumnAggregateFunction::Container & vec = real_column.getData(); + vec.reserve(col.column->size()); + auto agg_function = agg_type->getFunction(); + size_t size_of_state = agg_function->sizeOfData(); + size_t align_of_state = agg_function->alignOfData(); + + for (size_t i = 0; i < col.column->size(); ++i) + { + AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state); + + agg_function->create(place); + + auto value = col.column->getDataAt(i); + memcpy(place, value.data, value.size); + + vec.push_back(place); + } + return DB::ColumnWithTypeAndName(std::move(res_col), type, col.name); +} +DB::Block convertAggregateStateInBlock(DB::Block& block) +{ + ColumnsWithTypeAndName columns; + columns.reserve(block.columns()); + for (const auto & item : block.getColumnsWithTypeAndName()) + { + if (WhichDataType(item.type).isAggregateFunction()) + { + const auto *aggregate_col = checkAndGetColumn(*item.column); + if (isFixedSizeAggregateFunction(aggregate_col->getAggregateFunction())) + columns.emplace_back(convertAggregateStateToFixedString(item)); + else + columns.emplace_back(convertAggregateStateToString(item)); + } + else + { + columns.emplace_back(item); + } + } + return columns; +} +} + diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h new file mode 100644 index 000000000000..6536982ef572 --- /dev/null +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include + +namespace local_engine { + +bool isFixedSizeAggregateFunction(const DB::AggregateFunctionPtr & function); + +DB::Block convertAggregateStateInBlock(DB::Block& block); + +DB::ColumnWithTypeAndName convertAggregateStateToFixedString(const DB::ColumnWithTypeAndName & col); + +DB::ColumnWithTypeAndName convertAggregateStateToString(const DB::ColumnWithTypeAndName & col); + +DB::ColumnWithTypeAndName convertFixedStringToAggregateState(const DB::ColumnWithTypeAndName & col, const DB::DataTypePtr & type); + +} + diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp new file mode 100644 index 000000000000..12955579c624 --- /dev/null +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "NativeReader.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int INCORRECT_INDEX; + extern const int LOGICAL_ERROR; + extern const int CANNOT_READ_ALL_DATA; + extern const int INCORRECT_DATA; + extern const int TOO_LARGE_ARRAY_SIZE; +} +} + +using namespace DB; + +namespace local_engine +{ +void NativeReader::readData(const ISerialization & serialization, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint) +{ + ISerialization::DeserializeBinaryBulkSettings settings; + settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; }; + settings.avg_value_size_hint = avg_value_size_hint; + settings.position_independent_encoding = false; + settings.native_format = true; + + ISerialization::DeserializeBinaryBulkStatePtr state; + + serialization.deserializeBinaryBulkStatePrefix(settings, state); + serialization.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state, nullptr); + + if (column->size() != rows) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, + "Cannot read all data in NativeReader. Rows read: {}. Rows expected: {}", column->size(), rows); +} + +template +void NativeReader::readAggData(const DB::DataTypeAggregateFunction & data_type, DB::ColumnPtr & column, DB::ReadBuffer & istr, size_t rows) +{ + ColumnAggregateFunction & real_column = typeid_cast(*column->assumeMutable()); + auto & arena = real_column.createOrGetArena(); + ColumnAggregateFunction::Container & vec = real_column.getData(); + + vec.reserve(rows); + auto agg_function = data_type.getFunction(); + size_t size_of_state = agg_function->sizeOfData(); + size_t align_of_state = agg_function->alignOfData(); + + for (size_t i = 0; i < rows; ++i) + { + AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state); + agg_function->create(place); + if constexpr (FIXED) + { + auto n = istr.read(place, size_of_state); + chassert(n == size_of_state); + } + else + { + agg_function->deserialize(place, istr, std::nullopt, &arena); + istr.ignore(); + } + + vec.push_back(place); + } +} + + +Block NativeReader::getHeader() const +{ + return header; +} + +Block NativeReader::read() +{ + Block res; + + const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); + + if (istr.eof()) + { + return res; + } + + /// Dimensions + size_t columns = 0; + size_t rows = 0; + + readVarUInt(columns, istr); + readVarUInt(rows, istr); + + if (columns > 1'000'000uz) + throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Suspiciously many columns in Native format: {}", columns); + if (rows > 1'000'000'000'000uz) + throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Suspiciously many rows in Native format: {}", rows); + + if (columns == 0 && !header && rows != 0) + throw Exception(ErrorCodes::INCORRECT_DATA, "Zero columns but {} rows in Native format.", rows); + + for (size_t i = 0; i < columns; ++i) + { + ColumnWithTypeAndName column; + + column.name = "col_" + std::to_string(i); + + /// Type + String type_name; + readBinary(type_name, istr); + bool agg_opt_column = false; + String real_type_name = type_name; + if (type_name.ends_with(NativeWriter::AGG_STATE_SUFFIX)) + { + agg_opt_column = true; + real_type_name = type_name.substr(0, type_name.length() - NativeWriter::AGG_STATE_SUFFIX.length()); + } + column.type = data_type_factory.get(real_type_name); + bool is_agg_state_type = WhichDataType(column.type).isAggregateFunction(); + SerializationPtr serialization = column.type->getDefaultSerialization(); + + /// Data + ColumnPtr read_column = column.type->createColumn(*serialization); + + double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i]; + if (rows) /// If no rows, nothing to read. + { + if (is_agg_state_type && agg_opt_column) + { + const DataTypeAggregateFunction * agg_type = checkAndGetDataType(column.type.get()); + bool fixed = isFixedSizeAggregateFunction(agg_type->getFunction()); + if (fixed) + { + readAggData(*agg_type, read_column, istr, rows); + } + else + { + readAggData(*agg_type, read_column, istr, rows); + } + } + else + { + readData(*serialization, read_column, istr, rows, avg_value_size_hint); + } + } + column.column = std::move(read_column); + + res.insert(std::move(column)); + } + + if (res.rows() != rows) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Row count mismatch after deserialization, got: {}, expected: {}", res.rows(), rows); + + return res; +} + +} diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.h b/cpp-ch/local-engine/Storages/IO/NativeReader.h new file mode 100644 index 000000000000..d065fce347d4 --- /dev/null +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +namespace local_engine +{ + +class NativeReader +{ +public: + NativeReader(DB::ReadBuffer & istr_) : istr(istr_) {} + + static void readData(const DB::ISerialization & serialization, DB::ColumnPtr & column, DB::ReadBuffer & istr, size_t rows, double avg_value_size_hint); + template + static void readAggData(const DB::DataTypeAggregateFunction & data_type, DB::ColumnPtr & column, DB::ReadBuffer & istr, size_t rows); + + DB::Block getHeader() const; + + DB::Block read(); + +private: + DB::ReadBuffer & istr; + DB::Block header; + + DB::PODArray avg_value_size_hints; + + void updateAvgValueSizeHints(const DB::Block & block); +}; + +} diff --git a/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp b/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp new file mode 100644 index 000000000000..39a0cb7b579b --- /dev/null +++ b/cpp-ch/local-engine/Storages/IO/NativeWriter.cpp @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "NativeWriter.h" +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace DB; + +namespace local_engine +{ + +const String NativeWriter::AGG_STATE_SUFFIX= "#optagg"; +void NativeWriter::flush() +{ + ostr.next(); +} + +static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) +{ + /** If there are columns-constants - then we materialize them. + * (Since the data type does not know how to serialize / deserialize constants.) + */ + ColumnPtr full_column = column->convertToFullColumnIfConst(); + + ISerialization::SerializeBinaryBulkSettings settings; + settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; }; + settings.position_independent_encoding = false; + settings.low_cardinality_max_dictionary_size = 0; + + ISerialization::SerializeBinaryBulkStatePtr state; + serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state); + serialization.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state); + serialization.serializeBinaryBulkStateSuffix(settings, state); +} + +size_t NativeWriter::write(const DB::Block & block) +{ + size_t written_before = ostr.count(); + + block.checkNumberOfRows(); + + /// Dimensions + size_t columns = block.columns(); + size_t rows = block.rows(); + + writeVarUInt(columns, ostr); + writeVarUInt(rows, ostr); + + for (size_t i = 0; i < columns; ++i) + { + auto column = block.safeGetByPosition(i); + /// agg state will convert to fixedString, need write actual agg state type + auto original_type = header.safeGetByPosition(i).type; + /// Type + String type_name = original_type->getName(); + bool is_agg_opt = WhichDataType(original_type).isAggregateFunction() + && header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType(); + if (is_agg_opt) + { + writeStringBinary(type_name + AGG_STATE_SUFFIX, ostr); + } + else + { + writeStringBinary(type_name, ostr); + } + + SerializationPtr serialization = column.type->getDefaultSerialization(); + column.column = recursiveRemoveSparse(column.column); + /// Data + if (rows) /// Zero items of data is always represented as zero number of bytes. + { + const auto * agg_type = checkAndGetDataType(original_type.get()); + if (is_agg_opt && agg_type && !isFixedSizeAggregateFunction(agg_type->getFunction())) + { + const auto * str_col = static_cast(column.column.get()); + const PaddedPODArray & column_chars = str_col->getChars(); + ostr.write(column_chars.raw_data(), str_col->getOffsets().back()); + } + else + { + writeData(*serialization, column.column, ostr, 0, 0); + } + } + } + + size_t written_after = ostr.count(); + size_t written_size = written_after - written_before; + return written_size; +} +} diff --git a/cpp-ch/local-engine/Storages/IO/NativeWriter.h b/cpp-ch/local-engine/Storages/IO/NativeWriter.h new file mode 100644 index 000000000000..a958f4484dce --- /dev/null +++ b/cpp-ch/local-engine/Storages/IO/NativeWriter.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +namespace DB +{ +class WriteBuffer; +class CompressedWriteBuffer; +} + +namespace local_engine +{ + +class NativeWriter +{ +public: + static const String AGG_STATE_SUFFIX; + NativeWriter( + DB::WriteBuffer & ostr_, const DB::Block & header_): ostr(ostr_), header(header_) + {} + + DB::Block getHeader() const { return header; } + /// Returns the number of bytes written. + size_t write(const DB::Block & block); + void flush(); + + +private: + DB::WriteBuffer & ostr; + DB::Block header; +}; +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala index 0e85bf546e7c..3ac53799f3f9 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala @@ -437,8 +437,10 @@ class GlutenAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQL test("gluten Exchange reuse") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "100", - SQLConf.SHUFFLE_PARTITIONS.key -> "5") { + // magic threshold, ch backend has two bhj when threshold is 100 + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "90", + SQLConf.SHUFFLE_PARTITIONS.key -> "5" + ) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT value FROM testData join testData2 ON key = a " + "join (SELECT value v from testData join testData3 ON key = a) on value = v") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala index 6bbeb801825d..5b5ee83be49b 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala @@ -437,8 +437,10 @@ class GlutenAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQL test("gluten Exchange reuse") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "100", - SQLConf.SHUFFLE_PARTITIONS.key -> "5") { + // magic threshold, ch backend has two bhj when threshold is 100 + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "90", + SQLConf.SHUFFLE_PARTITIONS.key -> "5" + ) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT value FROM testData join testData2 ON key = a " + "join (SELECT value v from testData join testData3 ON key = a) on value = v")