-
Notifications
You must be signed in to change notification settings - Fork 446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CH] Optimize aggregate state serialization performance #3279
[CH] Optimize aggregate state serialization performance #3279
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
return col; | ||
} | ||
const auto *aggregate_col = checkAndGetColumn<ColumnAggregateFunction>(*col.column); | ||
size_t state_size = aggregate_col->getAggregateFunction()->sizeOfData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the state size of max(String) and collect_list(x) fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exclude collect_list, max(String) will use sort aggregate and fallback now
cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp
Outdated
Show resolved
Hide resolved
} | ||
|
||
size_t NativeWriter::write(const DB::Block & block) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of IndexOfBlockForNativeFormat
in the original NativeWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some index data..useless for us
bc8c977
to
bde57b9
Compare
Run Gluten Clickhouse CI |
2 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
{ | ||
/** If there are columns-constants - then we materialize them. | ||
* (Since the data type does not know how to serialize / deserialize constants.) | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can design some protocol to write less data for constant column to reducing IO amount.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是CH copy过来的。。暂时不做优化
bool isFixedSizeStateAggregateFunction(const String& name) | ||
{ | ||
// TODO max(String) should exclude, but fallback now | ||
static const std::set<String> function_set = {"min", "max", "sum", "count", "avg"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html, maybe other functions can also be added to the set, like mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里我会换一个方案来同时支持定长和变长的聚合
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
目前先用这个,后面遇到了在更新
1f50efc
to
de46b01
Compare
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
f3dd446
to
c466268
Compare
Run Gluten Clickhouse CI |
3 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
83b85ba
to
de32d27
Compare
Run Gluten Clickhouse CI |
de32d27
to
e0e8249
Compare
Run Gluten Clickhouse CI |
@@ -438,8 +438,10 @@ class GlutenAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQL | |||
test("gluten Exchange reuse") { | |||
withSQLConf( | |||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | |||
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "100", | |||
SQLConf.SHUFFLE_PARTITIONS.key -> "5") { | |||
// magic threshold, ch backend has two bhj when threshold is 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
...33/src/test/scala/org/apache/spark/sql/execution/adaptive/GlutenAdaptiveQueryExecSuite.scala
Show resolved
Hide resolved
size_t state_size = aggregate_col->getAggregateFunction()->sizeOfData(); | ||
auto res_type = std::make_shared<DataTypeFixedString>(state_size); | ||
auto res_col = res_type->createColumn(); | ||
PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnFixedString &>(*res_col).getChars(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use ColumnFixedString::reserve
is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同下
for (const auto & item : aggregate_col->getData()) | ||
{ | ||
column_chars_t.insert_assume_reserved(item, item + state_size); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for small state objects, could we try to use memcpy
?
too much function call may have some performance issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个写法是参考arrowColumnToCHColumn的readColumnWithNumericData。应该不会有性能问题
e0e8249
to
7e54520
Compare
Run Gluten Clickhouse CI |
2 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
LGTM |
Run Gluten Clickhouse CI |
return isFixedSizeStateAggregateFunction(function->getName()) && isFixedSizeArguments(function->getArgumentTypes()); | ||
} | ||
|
||
DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
尽量避免传值,避免不必要的复制开销。
} | ||
return DB::ColumnWithTypeAndName(std::move(res_col), type, col.name); | ||
} | ||
DB::Block convertAggregateStateInBlock(DB::Block block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
避免传值
|
||
bool isFixedSizeArguments(DataTypes data_types) | ||
{ | ||
return data_types.front()->isValueRepresentedByNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion
使用这个接口是否更为准确。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
先限制在数字类型,其他类型没测试过
} | ||
DB::Block convertAggregateStateInBlock(DB::Block block) | ||
{ | ||
ColumnsWithTypeAndName columns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
加上reserve
return data_types.front()->isValueRepresentedByNumber(); | ||
} | ||
|
||
bool isFixedSizeAggregateFunction(DB::AggregateFunctionPtr function) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
避免传值,其他类似地方也可以优化下。
|
||
DB::ColumnWithTypeAndName convertAggregateStateToFixedString(DB::ColumnWithTypeAndName col) | ||
{ | ||
if (!WhichDataType(col.type).isAggregateFunction()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的判断可以去掉,改成下面判断aggregate_col是否为nullptr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这种类型AggregateFunction(sum, Nullable(Int64)) 是否可以走fixed string
现在实际上是走的变长string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug, 需要remove nullable
auto res_type = std::make_shared<DataTypeString>(); | ||
auto res_col = res_type->createColumn(); | ||
PaddedPODArray<UInt8> & column_chars = assert_cast<ColumnString &>(*res_col).getChars(); | ||
column_chars.reserve(aggregate_col->size() * 60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
60这个数字是怎么来的? 会不会导致分配超出实际需要的内存
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
测试代码,删了
Run Gluten Clickhouse CI |
} | ||
|
||
bool isFixedSizeAggregateFunction(DB::AggregateFunctionPtr function) | ||
bool isFixedSizeAggregateFunction(const DB::AggregateFunctionPtr& function) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shared_ptr
不用传引用吧 直接传值
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
就这样吧,没有太大区别
for (const auto & item : aggregate_col->getData()) | ||
{ | ||
aggregate_col->getAggregateFunction()->serialize(item, value_writer); | ||
writeChar('\0', value_writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个有必要?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
字符串需要\0结尾,必须的
cfa0a4f
to
80ec518
Compare
Run Gluten Clickhouse CI |
LGTM |
1 similar comment
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, let's merge
What changes were proposed in this pull request?
Optimize aggregate state serialization performance, convert fixed size agg state to fixed string,
convert variable size aggregation state to string.
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)