Skip to content

Commit

Permalink
some fix
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 authored and baibaichen committed Nov 16, 2023
1 parent db955e4 commit 80ec518
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
4 changes: 3 additions & 1 deletion cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,11 @@ size_t Partition::spill(NativeWriter & writer)
if (lock.owns_lock())
{
size_t raw_size = 0;
for (const auto & block : blocks)
while (!blocks.empty())
{
auto & block = blocks.back();
raw_size += writer.write(block);
blocks.pop_back();
}
blocks.clear();
return raw_size;
Expand Down
25 changes: 12 additions & 13 deletions cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,23 @@ bool isFixedSizeStateAggregateFunction(const String& name)
return function_set.contains(name);
}

bool isFixedSizeArguments(DataTypes data_types)
bool isFixedSizeArguments(const DataTypes& data_types)
{
return data_types.front()->isValueRepresentedByNumber();
return removeNullable(data_types.front())->isValueRepresentedByNumber();
}

bool isFixedSizeAggregateFunction(DB::AggregateFunctionPtr function)
bool isFixedSizeAggregateFunction(const DB::AggregateFunctionPtr& function)
{
return isFixedSizeStateAggregateFunction(function->getName()) && isFixedSizeArguments(function->getArgumentTypes());
}

DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col)
DB::ColumnWithTypeAndName convertAggregateStateToFixedString(const DB::ColumnWithTypeAndName& col)
{
if (!WhichDataType(col.type).isAggregateFunction())
const auto *aggregate_col = checkAndGetColumn<ColumnAggregateFunction>(*col.column);
if (!aggregate_col)
{
return col;
}
const auto *aggregate_col = checkAndGetColumn<ColumnAggregateFunction>(*col.column);
// only support known fixed size aggregate function
if (!isFixedSizeAggregateFunction(aggregate_col->getAggregateFunction()))
{
Expand All @@ -70,17 +70,16 @@ DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeA
return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name);
}

DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col)
DB::ColumnWithTypeAndName convertAggregateStateToString(const DB::ColumnWithTypeAndName& col)
{
if (!WhichDataType(col.type).isAggregateFunction())
const auto *aggregate_col = checkAndGetColumn<ColumnAggregateFunction>(*col.column);
if (!aggregate_col)
{
return col;
}
const auto *aggregate_col = checkAndGetColumn<ColumnAggregateFunction>(*col.column);
auto res_type = std::make_shared<DataTypeString>();
auto res_col = res_type->createColumn();
PaddedPODArray<UInt8> & column_chars = assert_cast<ColumnString &>(*res_col).getChars();
column_chars.reserve(aggregate_col->size() * 60);
IColumn::Offsets & column_offsets = assert_cast<ColumnString &>(*res_col).getOffsets();
auto value_writer = WriteBufferFromVector<PaddedPODArray<UInt8>>(column_chars);
column_offsets.reserve(aggregate_col->size());
Expand All @@ -93,15 +92,14 @@ DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndNam
return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name);
}

DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type)
DB::ColumnWithTypeAndName convertFixedStringToAggregateState(const DB::ColumnWithTypeAndName & col, const DB::DataTypePtr & type)
{
chassert(WhichDataType(type).isAggregateFunction());
auto res_col = type->createColumn();
const auto * agg_type = checkAndGetDataType<DataTypeAggregateFunction>(type.get());
ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(*res_col);
auto & arena = real_column.createOrGetArena();
ColumnAggregateFunction::Container & vec = real_column.getData();

vec.reserve(col.column->size());
auto agg_function = agg_type->getFunction();
size_t size_of_state = agg_function->sizeOfData();
Expand All @@ -120,9 +118,10 @@ DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeA
}
return DB::ColumnWithTypeAndName(std::move(res_col), type, col.name);
}
DB::Block convertAggregateStateInBlock(DB::Block block)
DB::Block convertAggregateStateInBlock(DB::Block& block)
{
ColumnsWithTypeAndName columns;
columns.reserve(block.columns());
for (const auto & item : block.getColumnsWithTypeAndName())
{
if (WhichDataType(item.type).isAggregateFunction())
Expand Down
10 changes: 5 additions & 5 deletions cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@

namespace local_engine {

bool isFixedSizeAggregateFunction(DB::AggregateFunctionPtr function);
bool isFixedSizeAggregateFunction(const DB::AggregateFunctionPtr & function);

DB::Block convertAggregateStateInBlock(DB::Block block);
DB::Block convertAggregateStateInBlock(DB::Block& block);

DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col);
DB::ColumnWithTypeAndName convertAggregateStateToFixedString(const DB::ColumnWithTypeAndName & col);

DB::ColumnWithTypeAndName convertAggregateStateToString(DB::ColumnWithTypeAndName col);
DB::ColumnWithTypeAndName convertAggregateStateToString(const DB::ColumnWithTypeAndName & col);

DB::ColumnWithTypeAndName convertFixedStringToAggregateState(DB::ColumnWithTypeAndName col, DB::DataTypePtr type);
DB::ColumnWithTypeAndName convertFixedStringToAggregateState(const DB::ColumnWithTypeAndName & col, const DB::DataTypePtr & type);

}

0 comments on commit 80ec518

Please sign in to comment.