Skip to content

Commit

Permalink
[improve](ub) fix some runtime error of ubsan when downcast (apache#3…
Browse files Browse the repository at this point in the history
…5343)

those code could work well, but it will be report some runtime error under UBSAN,
so refactor it to let's ubsan could running happy.
  • Loading branch information
zhangstar333 authored May 27, 2024
1 parent b66f9ff commit 6e7ec66
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 58 deletions.
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(),
(vectorized::ColumnString*)(column.get()), _agg_arena_pool,
_deserialize_buffer.data(), column.get(), _agg_arena_pool,
rows);
}
} else {
Expand Down Expand Up @@ -348,8 +347,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(),
(vectorized::ColumnString*)(column.get()), _agg_arena_pool,
_deserialize_buffer.data(), column.get(), _agg_arena_pool,
rows);
}
} else {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,7 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
->function()
->deserialize_and_merge_vec_selected(
_places.data(), _shared_state->offsets_of_aggregate_states[i],
_deserialize_buffer.data(),
(vectorized::ColumnString*)(column.get()),
_deserialize_buffer.data(), column.get(),
_shared_state->agg_arena_pool.get(), rows);
}
} else {
Expand Down Expand Up @@ -532,7 +531,7 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(), _shared_state->offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (vectorized::ColumnString*)(column.get()),
_deserialize_buffer.data(), column.get(),
_shared_state->agg_arena_pool.get(), rows);
}
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ Status StreamingAggLocalState::_merge_with_serialized_key_helper(vectorized::Blo
_places.data(),
Base::_parent->template cast<StreamingAggOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (vectorized::ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
_deserialize_buffer.data(), column.get(), _agg_arena_pool.get(), rows);
}
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
Expand Down Expand Up @@ -299,8 +298,7 @@ Status StreamingAggLocalState::_merge_with_serialized_key_helper(vectorized::Blo
_places.data(),
Base::_parent->template cast<StreamingAggOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (vectorized::ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
_deserialize_buffer.data(), column.get(), _agg_arena_pool.get(), rows);
}
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
Expand Down
22 changes: 13 additions & 9 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include "util/defer_op.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/common/string_buffer.hpp"
#include "vec/core/block.h"
Expand Down Expand Up @@ -144,13 +146,12 @@ class IAggregateFunction {
size_t num_rows) const = 0;

virtual void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const = 0;

virtual void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs,
const ColumnString* column, Arena* arena,
const size_t num_rows) const = 0;
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const = 0;

virtual void deserialize_from_column(AggregateDataPtr places, const IColumn& column,
Arena* arena, size_t num_rows) const = 0;
Expand Down Expand Up @@ -375,13 +376,14 @@ class IAggregateFunctionHelper : public IAggregateFunction {
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
const auto size_of_data = assert_cast<const Derived*>(this)->size_of_data();
const auto* column_string = assert_cast<const ColumnString*>(column);
for (size_t i = 0; i != num_rows; ++i) {
try {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader buffer_reader(column->get_data_at(i));
VectorBufferReader buffer_reader(column_string->get_data_at(i));
assert_cast<const Derived*>(this)->create(rhs_place);
assert_cast<const Derived*>(this)->deserialize_and_merge(
places[i] + offset, rhs_place, buffer_reader, arena);
Expand All @@ -397,17 +399,19 @@ class IAggregateFunctionHelper : public IAggregateFunction {
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
const auto size_of_data = assert_cast<const Derived*>(this)->size_of_data();
const auto* column_string = assert_cast<const ColumnString*>(column);
for (size_t i = 0; i != num_rows; ++i) {
try {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader buffer_reader(column->get_data_at(i));
VectorBufferReader buffer_reader(column_string->get_data_at(i));
assert_cast<const Derived*>(this)->create(rhs_place);
if (places[i])
if (places[i]) {
assert_cast<const Derived*>(this)->deserialize_and_merge(
places[i] + offset, rhs_place, buffer_reader, arena);
}
} catch (...) {
for (int j = 0; j < i; ++j) {
auto place = rhs + size_of_data * j;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_avg.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,15 @@ class AggregateFunctionAvg final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
12 changes: 6 additions & 6 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ class AggregateFunctionBitmapSerializationHelper
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
const auto& col = assert_cast<const ColumnBitmap&>(*column);
const auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i] + offset).merge(data[i]);
}
Expand All @@ -236,11 +236,11 @@ class AggregateFunctionBitmapSerializationHelper
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
const auto& col = assert_cast<const ColumnBitmap&>(*column);
const auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
this->data(places[i] + offset).merge(data[i]);
Expand Down
12 changes: 6 additions & 6 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,20 @@ class AggregateFunctionBitmapAgg final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
const auto& col = assert_cast<const ColumnBitmap&>(*column);
const auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i] + offset).value |= data[i];
}
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
const auto& col = assert_cast<const ColumnBitmap&>(*column);
const auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
this->data(places[i] + offset).value |= data[i];
Expand Down
10 changes: 4 additions & 6 deletions be/src/vec/aggregate_functions/aggregate_function_collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,11 @@ class AggregateFunctionCollect
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
if constexpr (ShowNull::value) {
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i] + offset)
.deserialize_and_merge(*assert_cast<const IColumn*>(column), i);
this->data(places[i] + offset).deserialize_and_merge(*column, i);
}
} else {
return BaseHelper::deserialize_and_merge_vec(places, offset, rhs, column, arena,
Expand Down Expand Up @@ -674,13 +673,12 @@ class AggregateFunctionCollect
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
if constexpr (ShowNull::value) {
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
this->data(places[i] + offset)
.deserialize_and_merge(*assert_cast<const IColumn*>(column), i);
this->data(places[i] + offset).deserialize_and_merge(*column, i);
}
}
} else {
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/aggregate_functions/aggregate_function_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ class AggregateFunctionCount final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down Expand Up @@ -284,15 +284,15 @@ class AggregateFunctionCountNotNullUnary final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/aggregate_functions/aggregate_function_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,19 @@ class AggregateFunctionMapAgg final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
auto& col = assert_cast<const ColumnMap&>(*assert_cast<const IColumn*>(column));
const auto& col = assert_cast<const ColumnMap&>(*column);
for (size_t i = 0; i != num_rows; ++i) {
auto map = doris::vectorized::get<Map>(col[i]);
this->data(places[i] + offset).add(map[0], map[1]);
}
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
auto& col = assert_cast<const ColumnMap&>(*assert_cast<const IColumn*>(column));
const auto& col = assert_cast<const ColumnMap&>(*column);
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
auto map = doris::vectorized::get<Map>(col[i]);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_min_max.h
Original file line number Diff line number Diff line change
Expand Up @@ -624,15 +624,15 @@ class AggregateFunctionsSingleValue final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_sum.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ class AggregateFunctionSum final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
Loading

0 comments on commit 6e7ec66

Please sign in to comment.