Skip to content

Commit

Permalink
Use HashStringAllocator for raw_vector rows in HashLookup
Browse files Browse the repository at this point in the history
The rows in the HashLookup are stored as raw_vector.
The raw_vector uses a malloc type allocation to acquire aligned
memory. It was found this contributes to large unaccounted
memory when  mainly using the HashProbe and other operators.

This change allows the usage of a custom allocator for the
raw_vector. By default it keeps the current way of allocating
and deallocating.

For raw_vectors representing rows in the HashLookup the
allocator used is the HashStringAllocator which allows for
the accounting of the memory.

Co-authored-by: Jimmy Lu <[email protected]>
  • Loading branch information
czentgr and Yuhta committed Jul 15, 2024
1 parent 322d892 commit 33a6c05
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 49 deletions.
49 changes: 29 additions & 20 deletions velox/common/base/RawVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,27 @@

namespace facebook::velox {

template <typename T, uint8_t kAlignment>
class StdAlignedAllocator {
public:
T* allocate(size_t n) {
return reinterpret_cast<T*>(aligned_alloc(kAlignment, n));
}

void deallocate(T* data, size_t /*n*/) {
::free(data);
}
};

/// Class template similar to std::vector with no default construction and a
/// SIMD load worth of padding below and above the data. The idea is that one
/// can access the data at full SIMD width at both ends.
template <typename T>
template <
typename T,
typename Allocator = StdAlignedAllocator<T, simd::kPadding>>
class raw_vector {
public:
raw_vector() {
raw_vector(Allocator allocator = {}) : allocator_(std::move(allocator)) {
static_assert(std::is_trivially_destructible<T>::value);
}

Expand All @@ -42,34 +56,22 @@ class raw_vector {
}
}

// Constructs a copy of 'other'. See operator=. 'data_' must be copied.
raw_vector(const raw_vector<T>& other) {
*this = other;
}

raw_vector(raw_vector<T>&& other) noexcept {
*this = std::move(other);
}

// Moves 'other' to this, leaves 'other' empty, as after default
// construction.
void operator=(const raw_vector<T>& other) {
resize(other.size());
if (other.data_) {
memcpy(
data_,
other.data(),
bits::roundUp(size_ * sizeof(T), simd::kPadding));
}
}

// construction minus the allocator_. As a result the object cannot be reused.
void operator=(raw_vector<T>&& other) noexcept {
data_ = other.data_;
size_ = other.size_;
capacity_ = other.capacity_;
allocator_ = std::move(other.allocator_);
allocatedSize_ = other.allocatedSize_;
other.data_ = nullptr;
other.size_ = 0;
other.capacity_ = 0;
other.allocatedSize_ = 0;
}

bool empty() const {
Expand Down Expand Up @@ -152,6 +154,9 @@ class raw_vector {
}

private:
// Constructs a copy of 'other'. See operator=. 'data_' must be copied.
raw_vector(const raw_vector<T>& other) {}

// Adds 'bytes' to the address 'pointer'.
inline T* addBytes(T* pointer, int32_t bytes) {
return reinterpret_cast<T*>(reinterpret_cast<uint64_t>(pointer) + bytes);
Expand All @@ -165,7 +170,8 @@ class raw_vector {

T* allocateData(int32_t size, int32_t& capacity) {
auto bytes = paddedSize(sizeof(T) * size);
auto ptr = reinterpret_cast<T*>(aligned_alloc(simd::kPadding, bytes));
auto ptr = allocator_.allocate(bytes);
allocatedSize_ = bytes;
// Clear the word below the pointer so that we do not get read of
// uninitialized when reading a partial word that extends below
// the pointer.
Expand All @@ -177,7 +183,8 @@ class raw_vector {

void freeData(T* data) {
if (data_) {
::free(addBytes(data, -simd::kPadding));
auto bytes = paddedSize(sizeof(T) * allocatedSize_);
allocator_.deallocate(addBytes(data, -simd::kPadding), bytes);
}
}

Expand All @@ -193,6 +200,8 @@ class raw_vector {
T* data_{nullptr};
int32_t size_{0};
int32_t capacity_{0};
Allocator allocator_{};
size_t allocatedSize_{0};
};

// Returns a pointer to 'size' int32_t's with consecutive values
Expand Down
17 changes: 6 additions & 11 deletions velox/common/base/tests/RawVectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,18 @@ TEST(RawVectorTest, resize) {
}

TEST(RawVectorTest, copyAndMove) {
const auto kNumRows = 1000;
raw_vector<int32_t> ints(1000);
const auto kDataSize = kNumRows * sizeof(int32_t);
// a raw_vector is intentionally not initialized.
memset(ints.data(), 11, ints.size() * sizeof(int32_t));
ints[ints.size() - 1] = 12345;
raw_vector<int32_t> intsCopy(ints);
EXPECT_EQ(
0, memcmp(ints.data(), intsCopy.data(), ints.size() * sizeof(int32_t)));
memset(ints.data(), 11, kDataSize);
ints[kNumRows - 1] = 12345;
auto data = ints.data();

raw_vector<int32_t> intsMoved(std::move(ints));
EXPECT_TRUE(ints.empty());

EXPECT_EQ(
0,
memcmp(
intsMoved.data(),
intsCopy.data(),
intsCopy.size() * sizeof(int32_t)));
EXPECT_EQ(0, memcmp(intsMoved.data(), data, kDataSize));
}

TEST(RawVectorTest, iota) {
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ void GroupingSet::createHashTable() {
}
}

lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(
table_->hashers(), table_->stringAllocator());
if (!isAdaptive_ && table_->hashMode() != BaseHashTable::HashMode::kHash) {
table_->forceGenericHashMode();
}
Expand All @@ -409,7 +410,7 @@ void GroupingSet::initializeGlobalAggregation() {
return;
}

lookup_ = std::make_unique<HashLookup>(hashers_);
lookup_ = std::make_unique<HashLookup>(hashers_, table_->stringAllocator());
lookup_->reset(1);

// Row layout is:
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ void HashProbe::initialize() {
}

VELOX_CHECK_NULL(lookup_);
lookup_ = std::make_unique<HashLookup>(hashers_);
VELOX_CHECK_NOT_NULL(table_);
lookup_ = std::make_unique<HashLookup>(hashers_, table_->stringAllocator());
auto buildType = joinNode_->sources()[1]->outputType();
auto tableType = makeTableType(buildType.get(), joinNode_->rightKeys());
if (joinNode_->filter()) {
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1910,7 +1910,7 @@ int32_t HashTable<false>::listNullKeyRows(
if (!iter->initialized) {
VELOX_CHECK_GT(nextOffset_, 0);
VELOX_CHECK_EQ(hashers_.size(), 1);
HashLookup lookup(hashers_);
HashLookup lookup(hashers_, stringAllocator());
if (hashMode_ == HashMode::kHash) {
lookup.hashes.push_back(VectorHasher::kNullHash);
} else {
Expand Down Expand Up @@ -2064,7 +2064,9 @@ template class HashTable<false>;
namespace {
void populateLookupRows(
const SelectivityVector& rows,
raw_vector<vector_size_t>& lookupRows) {
raw_vector<
vector_size_t,
AlignedStlAllocator<vector_size_t, simd::kPadding>>& lookupRows) {
if (rows.isAllSelected()) {
std::iota(lookupRows.begin(), lookupRows.end(), 0);
} else {
Expand Down
14 changes: 10 additions & 4 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ struct TableInsertPartitionInfo {

/// Contains input and output parameters for groupProbe and joinProbe APIs.
struct HashLookup {
explicit HashLookup(const std::vector<std::unique_ptr<VectorHasher>>& h)
: hashers(h) {}
HashLookup(
const std::vector<std::unique_ptr<VectorHasher>>& h,
HashStringAllocator* hsa)
: hashers(h),
rows(AlignedStlAllocator<vector_size_t, simd::kPadding>(hsa)) {}

void reset(vector_size_t size) {
rows.resize(size);
Expand All @@ -74,7 +77,8 @@ struct HashLookup {
/// Input to groupProbe and joinProbe APIs.

/// Set of row numbers of row to probe.
raw_vector<vector_size_t> rows;
raw_vector<vector_size_t, AlignedStlAllocator<vector_size_t, simd::kPadding>>
rows;

/// Hashes or value IDs for rows in 'rows'. Not aligned with 'rows'. Index is
/// the row number.
Expand Down Expand Up @@ -152,7 +156,9 @@ class BaseHashTable {
return !rows || lastRowIndex == rows->size();
}

const raw_vector<vector_size_t>* rows{nullptr};
const raw_vector<
vector_size_t,
AlignedStlAllocator<vector_size_t, simd::kPadding>>* rows{nullptr};
const raw_vector<char*>* hits{nullptr};
vector_size_t lastRowIndex{0};
vector_size_t lastDuplicateRowIndex{0};
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ RowNumber::RowNumber(
false, // hasProbedFlag
0, // minTableSizeForParallelJoinBuild
pool());
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(
table_->hashers(), table_->stringAllocator());

const auto numRowsColumn = table_->rows()->columnAt(numKeys);
numRowsOffset_ = numRowsColumn.offset();
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ TopNRowNumber::TopNRowNumber(
0, // minTableSizeForParallelJoinBuild
pool());
partitionOffset_ = table_->rows()->columnAt(numKeys).offset();
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(
table_->hashers(), table_->stringAllocator());
} else {
allocator_ = std::make_unique<HashStringAllocator>(pool());
singlePartition_ = std::make_unique<TopRows>(allocator_.get(), comparator_);
Expand Down
21 changes: 14 additions & 7 deletions velox/exec/tests/HashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ class HashTableTest : public testing::TestWithParam<bool>,
int32_t sequence = 0;
std::vector<RowVectorPtr> batches;
auto table = createHashTableForAggregation(tableType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(
table->hashers(), table->stringAllocator());
std::vector<char*> allInserted;
int32_t numErased = 0;
// We insert 1000 and delete 500.
Expand Down Expand Up @@ -446,7 +447,8 @@ class HashTableTest : public testing::TestWithParam<bool>,
}

void testProbe() {
auto lookup = std::make_unique<HashLookup>(topTable_->hashers());
auto lookup = std::make_unique<HashLookup>(
topTable_->hashers(), topTable_->stringAllocator());
const auto batchSize = batches_[0]->size();
SelectivityVector rows(batchSize);
const auto mode = topTable_->hashMode();
Expand Down Expand Up @@ -646,7 +648,8 @@ TEST_P(HashTableTest, bestWithReserveOverflow) {
ROW({"a", "b", "c", "d"}, {BIGINT(), BIGINT(), BIGINT(), BIGINT()});
const auto numKeys = 4;
auto table = createHashTableForAggregation(rowType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocator());

// Make sure rangesWithReserve overflows.
// Ranges for keys are: 200K, 200K, 200K, 100K.
Expand Down Expand Up @@ -707,7 +710,8 @@ TEST_P(HashTableTest, bestWithReserveOverflow) {
TEST_P(HashTableTest, enableRangeWhereCan) {
auto rowType = ROW({"a", "b", "c"}, {BIGINT(), VARCHAR(), VARCHAR()});
auto table = createHashTableForAggregation(rowType, 3);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocator());

// Generate 3 keys with the following ranges and number of distinct values
// (ndv):
Expand Down Expand Up @@ -746,7 +750,8 @@ TEST_P(HashTableTest, enableRangeWhereCan) {

TEST_P(HashTableTest, arrayProbeNormalizedKey) {
auto table = createHashTableForAggregation(ROW({"a"}, {BIGINT()}), 1);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocator());

for (auto i = 0; i < 200; ++i) {
auto data = makeRowVector({
Expand Down Expand Up @@ -804,7 +809,8 @@ TEST_P(HashTableTest, groupBySpill) {
TEST_P(HashTableTest, checkSizeValidation) {
auto rowType = ROW({"a"}, {BIGINT()});
auto table = createHashTableForAggregation(rowType, 1);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocator());
auto testHelper = HashTableTestHelper<false>::create(table.get());

// The initial set hash mode with table size of 256K entries.
Expand Down Expand Up @@ -941,7 +947,8 @@ TEST_P(HashTableTest, offsetOverflowLoadTags) {
auto rowType = ROW({"a"}, {BIGINT()});
auto table = createHashTableForAggregation(rowType, rowType->size());
table->hashMode();
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocator());
auto batchSize = 1 << 25;
for (auto i = 0; i < 64; ++i) {
std::vector<RowVectorPtr> batches;
Expand Down

0 comments on commit 33a6c05

Please sign in to comment.