Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] Optimize aggregate state serialization performance #3279

Merged
merged 14 commits into from
Nov 16, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,44 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
}
}
}

test("collect_set") {
val sql =
"""
|select a, b from (
|select n_regionkey as a, collect_set(if(n_regionkey=0, n_name, null))
| as set from nation group by n_regionkey)
|lateral view explode(set) as b
|order by a, b
|""".stripMargin
runQueryAndCompare(sql)(checkOperatorMatch[CHHashAggregateExecTransformer])
}

test("test 'aggregate function collect_list'") {
val df = runQueryAndCompare(
"select l_orderkey,from_unixtime(l_orderkey, 'yyyy-MM-dd HH:mm:ss') " +
"from lineitem order by l_orderkey desc limit 10"
)(checkOperatorMatch[ProjectExecTransformer])
checkLengthAndPlan(df, 10)
}

test("test max string") {
withSQLConf(("spark.gluten.sql.columnar.force.hashagg", "true")) {
val sql =
"""
|SELECT
| l_returnflag,
| l_linestatus,
| max(l_comment)
|FROM
| lineitem
|WHERE
| l_shipdate <= date'1998-09-02' - interval 1 day
|GROUP BY
| l_returnflag,
| l_linestatus
|""".stripMargin
runQueryAndCompare(sql, noFallBack = false) { df => }
}
}
}
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Builder/SerializedPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ std::shared_ptr<substrait::Type> SerializedPlanBuilder::buildType(const DB::Data
res->mutable_i32()->set_nullability(type_nullability);
else if (which.isInt64())
res->mutable_i64()->set_nullability(type_nullability);
else if (which.isString() || which.isAggregateFunction())
else if (which.isStringOrFixedString() || which.isAggregateFunction())
res->mutable_binary()->set_nullability(type_nullability); /// Spark Binary type is more similiar to CH String type
else if (which.isFloat32())
res->mutable_fp32()->set_nullability(type_nullability);
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
#include "StorageJoinFromReadBuffer.h"

#include <Formats/NativeReader.h>
#include <Storages/IO/NativeReader.h>
#include <Interpreters/Context.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/TableJoin.h>
Expand All @@ -42,7 +42,7 @@ using namespace DB;

void restore(DB::ReadBuffer & in, IJoin & join, const Block & sample_block)
{
NativeReader block_stream(in, 0);
local_engine::NativeReader block_stream(in);

ProfileInfo info;
{
Expand Down
6 changes: 5 additions & 1 deletion cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "CachedShuffleWriter.h"
#include <Poco/StringTokenizer.h>
#include <Common/Stopwatch.h>
#include <Storages/IO/AggregateSerializationUtils.h>
#include <Shuffle/PartitionWriter.h>
#include <jni/CelebornClient.h>
#include <jni/jni_common.h>
Expand Down Expand Up @@ -94,6 +95,10 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions
void CachedShuffleWriter::split(DB::Block & block)
{
initOutputIfNeeded(block);
Stopwatch split_time_watch;
split_time_watch.start();
block = convertAggregateStateInBlock(block);
split_result.total_split_time += split_time_watch.elapsedNanoseconds();

Stopwatch compute_pid_time_watch;
compute_pid_time_watch.start();
Expand All @@ -105,7 +110,6 @@ void CachedShuffleWriter::split(DB::Block & block)
{
out_block.insert(block.getByPosition(output_columns_indicies[col]));
}

partition_writer->write(partition_info, out_block);

if (options.spill_threshold > 0 && partition_writer->totalCacheSize() > options.spill_threshold)
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <Common/Exception.h>
#include <Common/JNIUtils.h>
#include <Common/logger_useful.h>
#include <Storages/IO/AggregateSerializationUtils.h>

namespace local_engine
{
Expand Down
18 changes: 11 additions & 7 deletions cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Common/CHUtil.h>
#include <IO/WriteBufferFromString.h>
#include <format>
#include <Storages/IO/NativeWriter.h>

using namespace DB;

Expand Down Expand Up @@ -56,7 +57,7 @@ void local_engine::PartitionWriter::write(const PartitionInfo& partition_info, D
if (buffer.size() >= shuffle_writer->options.split_size)
{
Block block = buffer.releaseColumns();
auto bytes = block.bytes();
auto bytes = block.allocatedBytes();
total_partition_buffer_size += bytes;
shuffle_writer->split_result.raw_partition_length[i] += bytes;
partition_buffer[i].addBlock(block);
Expand All @@ -73,7 +74,7 @@ void LocalPartitionWriter::evictPartitions(bool for_memory_spill)
WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size);
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, 0, shuffle_writer->output_header);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
SpillInfo info;
info.spilled_file = file;
size_t partition_id = 0;
Expand Down Expand Up @@ -122,15 +123,16 @@ std::vector<Int64> LocalPartitionWriter::mergeSpills(WriteBuffer& data_file)
{
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, 0, shuffle_writer->output_header);
NativeWriter writer(compressed_output, shuffle_writer->output_header);

std::vector<Int64> partition_length;
partition_length.resize(shuffle_writer->options.partition_nums, 0);
std::vector<ReadBufferPtr> spill_inputs;
spill_inputs.reserve(spill_infos.size());
for (const auto & spill : spill_infos)
{
spill_inputs.emplace_back(std::make_shared<ReadBufferFromFile>(spill.spilled_file, shuffle_writer->options.io_buffer_size));
// only use readBig
spill_inputs.emplace_back(std::make_shared<ReadBufferFromFile>(spill.spilled_file, 0));
}

Stopwatch write_time_watch;
Expand Down Expand Up @@ -229,7 +231,7 @@ void CelebornPartitionWriter::evictPartitions(bool for_memory_spill)
WriteBufferFromOwnString output;
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, 0, shuffle_writer->output_header);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
size_t raw_size = partition.spill(writer);
compressed_output.sync();
Stopwatch push_time_watch;
Expand Down Expand Up @@ -302,15 +304,17 @@ void Partition::clear()
blocks.clear();
}

size_t Partition::spill(DB::NativeWriter & writer)
size_t Partition::spill(NativeWriter & writer)
{
std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);
if (lock.owns_lock())
{
size_t raw_size = 0;
for (const auto & block : blocks)
while (!blocks.empty())
{
auto & block = blocks.back();
raw_size += writer.write(block);
blocks.pop_back();
}
blocks.clear();
return raw_size;
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Partition
void addBlock(DB::Block & block);
bool empty() const;
void clear();
size_t spill(DB::NativeWriter & writer);
size_t spill(NativeWriter & writer);

private:
std::vector<DB::Block> blocks;
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ local_engine::ShuffleReader::ShuffleReader(std::unique_ptr<ReadBuffer> in_, bool
{
compressed_in = std::make_unique<CompressedReadBuffer>(*in);
configureCompressedReadBuffer(static_cast<DB::CompressedReadBuffer &>(*compressed_in));
input_stream = std::make_unique<NativeReader>(*compressed_in, 0);
input_stream = std::make_unique<NativeReader>(*compressed_in);
}
else
{
input_stream = std::make_unique<NativeReader>(*in, 0);
input_stream = std::make_unique<NativeReader>(*in);
}
}
Block * local_engine::ShuffleReader::read()
Expand Down
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Formats/NativeReader.h>
#include <IO/BufferWithOwnMemory.h>
#include <Common/BlockIterator.h>
#include <Storages/IO/NativeReader.h>

namespace DB
{
Expand All @@ -42,7 +43,7 @@ class ShuffleReader : BlockIterator
private:
std::unique_ptr<DB::ReadBuffer> in;
std::unique_ptr<DB::ReadBuffer> compressed_in;
std::unique_ptr<DB::NativeReader> input_stream;
std::unique_ptr<local_engine::NativeReader> input_stream;
DB::Block header;
};

Expand Down
25 changes: 18 additions & 7 deletions cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <string>
#include <fcntl.h>
#include <Compression/CompressionFactory.h>
#include <Functions/FunctionFactory.h>
#include <Storages/IO/AggregateSerializationUtils.h>
#include <IO/BrotliWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
Expand All @@ -39,7 +39,12 @@ void ShuffleSplitter::split(DB::Block & block)
{
return;
}
initOutputIfNeeded(block);
computeAndCountPartitionId(block);
Stopwatch split_time_watch;
split_time_watch.start();
block = convertAggregateStateInBlock(block);
split_result.total_split_time += split_time_watch.elapsedNanoseconds();
splitBlockByPartition(block);
}
SplitResult ShuffleSplitter::stop()
Expand Down Expand Up @@ -70,12 +75,12 @@ SplitResult ShuffleSplitter::stop()
stopped = true;
return split_result;
}
void ShuffleSplitter::splitBlockByPartition(DB::Block & block)

void ShuffleSplitter::initOutputIfNeeded(Block & block)
{
Stopwatch split_time_watch;
split_time_watch.start();
if (!output_header.columns()) [[unlikely]]
if (output_header.columns() == 0) [[unlikely]]
{
output_header = block.cloneEmpty();
if (output_columns_indicies.empty())
{
output_header = block.cloneEmpty();
Expand All @@ -86,14 +91,20 @@ void ShuffleSplitter::splitBlockByPartition(DB::Block & block)
}
else
{
DB::ColumnsWithTypeAndName cols;
ColumnsWithTypeAndName cols;
for (const auto & index : output_columns_indicies)
{
cols.push_back(block.getByPosition(index));
}
output_header = DB::Block(cols);
}
}
}

void ShuffleSplitter::splitBlockByPartition(DB::Block & block)
{
Stopwatch split_time_watch;
split_time_watch.start();
DB::Block out_block;
for (size_t col = 0; col < output_header.columns(); ++col)
{
Expand Down Expand Up @@ -148,7 +159,7 @@ void ShuffleSplitter::spillPartition(size_t partition_id)
{
partition_write_buffers[partition_id] = getPartitionWriteBuffer(partition_id);
partition_outputs[partition_id]
= std::make_unique<DB::NativeWriter>(*partition_write_buffers[partition_id], 0, partition_buffer[partition_id].getHeader());
= std::make_unique<NativeWriter>(*partition_write_buffers[partition_id], output_header);
}
DB::Block result = partition_buffer[partition_id].releaseColumns();
if (result.rows() > 0)
Expand Down
5 changes: 3 additions & 2 deletions cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <memory>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Formats/NativeWriter.h>
#include <Storages/IO/NativeWriter.h>
#include <Functions/IFunction.h>
#include <IO/WriteBufferFromFile.h>
#include <Shuffle/SelectorBuilder.h>
Expand Down Expand Up @@ -101,6 +101,7 @@ class ShuffleSplitter : public ShuffleWriterBase

private:
void init();
void initOutputIfNeeded(DB::Block & block);
void splitBlockByPartition(DB::Block & block);
void spillPartition(size_t partition_id);
std::string getPartitionTempFile(size_t partition_id);
Expand All @@ -111,7 +112,7 @@ class ShuffleSplitter : public ShuffleWriterBase
bool stopped = false;
PartitionInfo partition_info;
std::vector<ColumnsBuffer> partition_buffer;
std::vector<std::unique_ptr<DB::NativeWriter>> partition_outputs;
std::vector<std::unique_ptr<local_engine::NativeWriter>> partition_outputs;
std::vector<std::unique_ptr<DB::WriteBuffer>> partition_write_buffers;
std::vector<std::unique_ptr<DB::WriteBuffer>> partition_cached_write_buffers;
std::vector<local_engine::CompressedWriteBuffer *> compressed_buffers;
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ void ShuffleWriter::write(const Block & block)
{
if (compression_enable)
{
native_writer = std::make_unique<NativeWriter>(*compressed_out, 0, block.cloneEmpty());
native_writer = std::make_unique<NativeWriter>(*compressed_out, block.cloneEmpty());
}
else
{
native_writer = std::make_unique<NativeWriter>(*write_buffer, 0, block.cloneEmpty());
native_writer = std::make_unique<NativeWriter>(*write_buffer, block.cloneEmpty());
}
}
if (block.rows() > 0)
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Shuffle/ShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
#pragma once
#include <jni.h>
#include <Formats/NativeWriter.h>
#include <Storages/IO/NativeWriter.h>

namespace local_engine
{
Expand All @@ -32,7 +32,7 @@ class ShuffleWriter
private:
std::unique_ptr<DB::WriteBuffer> compressed_out;
std::unique_ptr<DB::WriteBuffer> write_buffer;
std::unique_ptr<DB::NativeWriter> native_writer;
std::unique_ptr<NativeWriter> native_writer;
bool compression_enable;
};
}
Loading