Skip to content

Commit

Permalink
optimize shuffle more
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Oct 31, 2023
1 parent de46b01 commit c466268
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 9 deletions.
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void Partition::clear()
blocks.clear();
}

size_t Partition::spill(DB::NativeWriter & writer)
size_t Partition::spill(NativeWriter & writer)
{
std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);
if (lock.owns_lock())
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Partition
void addBlock(DB::Block & block);
bool empty() const;
void clear();
size_t spill(DB::NativeWriter & writer);
size_t spill(NativeWriter & writer);

private:
std::vector<DB::Block> blocks;
Expand Down
28 changes: 27 additions & 1 deletion cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeString.h>


using namespace DB;
Expand Down Expand Up @@ -58,6 +60,30 @@ DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeA
}
return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name);
}

DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col)
{
if (!isAggregateFunction(col.type))
{
return col;
}
const auto *aggregate_col = checkAndGetColumn<ColumnAggregateFunction>(*col.column);
auto res_type = std::make_shared<DataTypeString>();
auto res_col = res_type->createColumn();
PaddedPODArray<UInt8> & column_chars = assert_cast<ColumnString &>(*res_col).getChars();
column_chars.reserve(aggregate_col->size() * 60);
IColumn::Offsets & column_offsets = assert_cast<ColumnString &>(*res_col).getOffsets();
auto value_writer = WriteBufferFromVector<PaddedPODArray<UInt8>>(column_chars);
column_offsets.reserve(aggregate_col->size());
for (const auto & item : aggregate_col->getData())
{
aggregate_col->getAggregateFunction()->serialize(item, value_writer);
writeChar('\0', value_writer);
column_offsets.emplace_back(value_writer.count());
}
return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name);
}

DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type)
{
chassert(isAggregateFunction(type));
Expand Down Expand Up @@ -90,7 +116,7 @@ DB::Block convertAggregateStateInBlock(DB::Block block)
ColumnsWithTypeAndName columns;
for (const auto & item : block.getColumnsWithTypeAndName())
{
columns.emplace_back(convertAggregateStateToFixedString(item));
columns.emplace_back(convertAggregateStateToString(item));
}
return columns;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ DB::Block convertAggregateStateInBlock(DB::Block block);

DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col);

DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col);

DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type);

}
Expand Down
7 changes: 4 additions & 3 deletions cpp-ch/local-engine/Storages/IO/NativeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ void NativeReader::readAggData(const DB::DataTypeAggregateFunction & data_type,
AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state);

agg_function->create(place);

auto n = istr.read(place, size_of_state);
chassert(n == size_of_state);
// UInt64 size;
// readVarUInt(size, istr);
agg_function->deserialize(place, istr);
istr.ignore();
vec.push_back(place);
}
}
Expand Down
20 changes: 17 additions & 3 deletions cpp-ch/local-engine/Storages/IO/NativeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <IO/WriteHelpers.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnString.h>


using namespace DB;

Expand Down Expand Up @@ -69,8 +71,9 @@ size_t NativeWriter::write(const DB::Block & block)
auto original_type = header.safeGetByPosition(i).type;
/// Type
String type_name = original_type->getName();
if (isAggregateFunction(original_type)
&& header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType())
bool is_agg_opt = isAggregateFunction(original_type)
&& header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType();
if (is_agg_opt)
{
writeStringBinary(type_name + AGG_STATE_SUFFIX, ostr);
}
Expand All @@ -83,7 +86,18 @@ size_t NativeWriter::write(const DB::Block & block)
column.column = recursiveRemoveSparse(column.column);
/// Data
if (rows) /// Zero items of data is always represented as zero number of bytes.
writeData(*serialization, column.column, ostr, 0, 0);
{
if (is_agg_opt)
{
const auto * str_col = static_cast<const ColumnString *>(column.column.get());
const PaddedPODArray<UInt8> & column_chars = str_col->getChars();
ostr.write(column_chars.raw_data(), str_col->getOffsets().back());
}
else
{
writeData(*serialization, column.column, ostr, 0, 0);
}
}
}

size_t written_after = ostr.count();
Expand Down

0 comments on commit c466268

Please sign in to comment.