Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use HashStringAllocator for raw_vector rows in HashLookup #10349

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions velox/common/base/RawVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,35 @@

#pragma once

#include <iostream>

#include <folly/Range.h>
#include "velox/common/base/BitUtil.h"
#include "velox/common/base/SimdUtil.h"

namespace facebook::velox {

template <typename T, uint8_t kAlignment>
class StdAlignedAllocator {
public:
T* allocate(size_t n) {
return reinterpret_cast<T*>(aligned_alloc(kAlignment, n * sizeof(T)));
}

void deallocate(T* data, size_t /*n*/) {
::free(data);
}
};

/// Class template similar to std::vector with no default construction and a
/// SIMD load worth of padding below and above the data. The idea is that one
/// can access the data at full SIMD width at both ends.
template <typename T>
template <
typename T,
typename Allocator = StdAlignedAllocator<T, simd::kPadding>>
Yuhta marked this conversation as resolved.
Show resolved Hide resolved
class raw_vector {
public:
raw_vector() {
raw_vector(Allocator allocator = {}) : allocator_(std::move(allocator)) {
static_assert(std::is_trivially_destructible<T>::value);
}

Expand All @@ -42,35 +58,11 @@ class raw_vector {
}
}

// Constructs a copy of 'other'. See operator=. 'data_' must be copied.
raw_vector(const raw_vector<T>& other) {
*this = other;
}

raw_vector(raw_vector<T>&& other) noexcept {
*this = std::move(other);
}

// Moves 'other' to this, leaves 'other' empty, as after default
// construction.
void operator=(const raw_vector<T>& other) {
resize(other.size());
if (other.data_) {
memcpy(
data_,
other.data(),
bits::roundUp(size_ * sizeof(T), simd::kPadding));
}
}

void operator=(raw_vector<T>&& other) noexcept {
data_ = other.data_;
size_ = other.size_;
capacity_ = other.capacity_;
other.data_ = nullptr;
other.size_ = 0;
other.capacity_ = 0;
}
void operator=(raw_vector<T>&& other) noexcept = default;

bool empty() const {
return size_ == 0;
Expand Down Expand Up @@ -152,6 +144,9 @@ class raw_vector {
}

private:
// Constructs a copy of 'other'. See operator=. 'data_' must be copied.
raw_vector(const raw_vector<T>& other) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use raw_vector(const raw_vector<T>& other) = deleted
Also delete the copy assignment void operator=(const raw_vector<T>& other) = deleted


// Adds 'bytes' to the address 'pointer'.
inline T* addBytes(T* pointer, int32_t bytes) {
return reinterpret_cast<T*>(reinterpret_cast<uint64_t>(pointer) + bytes);
Expand All @@ -165,7 +160,11 @@ class raw_vector {

T* allocateData(int32_t size, int32_t& capacity) {
auto bytes = paddedSize(sizeof(T) * size);
auto ptr = reinterpret_cast<T*>(aligned_alloc(simd::kPadding, bytes));
auto numElementsOfT = std::ceil(bytes / sizeof(T));
// The allocate function takes in number of elements for the AlignedStlAllocator
// and not the number of bytes.
auto ptr = allocator_.allocate(numElementsOfT);
allocatedSize_ = numElementsOfT;
// Clear the word below the pointer so that we do not get read of
// uninitialized when reading a partial word that extends below
// the pointer.
Expand All @@ -177,7 +176,7 @@ class raw_vector {

void freeData(T* data) {
if (data_) {
::free(addBytes(data, -simd::kPadding));
allocator_.deallocate(addBytes(data, -simd::kPadding), allocatedSize_);
}
}

Expand All @@ -193,6 +192,8 @@ class raw_vector {
T* data_{nullptr};
int32_t size_{0};
int32_t capacity_{0};
Allocator allocator_{};
size_t allocatedSize_{0};
};

// Returns a pointer to 'size' int32_t's with consecutive values
Expand Down
17 changes: 6 additions & 11 deletions velox/common/base/tests/RawVectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,18 @@ TEST(RawVectorTest, resize) {
}

TEST(RawVectorTest, copyAndMove) {
const auto kNumRows = 1000;
raw_vector<int32_t> ints(1000);
const auto kDataSize = kNumRows * sizeof(int32_t);
// a raw_vector is intentionally not initialized.
memset(ints.data(), 11, ints.size() * sizeof(int32_t));
ints[ints.size() - 1] = 12345;
raw_vector<int32_t> intsCopy(ints);
EXPECT_EQ(
0, memcmp(ints.data(), intsCopy.data(), ints.size() * sizeof(int32_t)));
memset(ints.data(), 11, kDataSize);
ints[kNumRows - 1] = 12345;
auto data = ints.data();

raw_vector<int32_t> intsMoved(std::move(ints));
EXPECT_TRUE(ints.empty());

EXPECT_EQ(
0,
memcmp(
intsMoved.data(),
intsCopy.data(),
intsCopy.size() * sizeof(int32_t)));
EXPECT_EQ(0, memcmp(intsMoved.data(), data, kDataSize));
}

TEST(RawVectorTest, iota) {
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ void GroupingSet::createHashTable() {
}
}

lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(
table_->hashers(), table_->stringAllocatorShared());
if (!isAdaptive_ && table_->hashMode() != BaseHashTable::HashMode::kHash) {
table_->forceGenericHashMode(BaseHashTable::kNoSpillInputStartPartitionBit);
}
Expand All @@ -408,7 +409,7 @@ void GroupingSet::initializeGlobalAggregation() {
return;
}

lookup_ = std::make_unique<HashLookup>(hashers_);
lookup_ = std::make_unique<HashLookup>(hashers_, table_->stringAllocatorShared());
lookup_->reset(1);

// Row layout is:
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ void HashProbe::initialize() {
keyChannels_.push_back(hasher->channel());
}

VELOX_CHECK_NULL(lookup_);
lookup_ = std::make_unique<HashLookup>(hashers_);
auto buildType = joinNode_->sources()[1]->outputType();
auto tableType = makeTableType(buildType.get(), joinNode_->rightKeys());
if (joinNode_->filter()) {
Expand Down Expand Up @@ -349,6 +347,8 @@ void HashProbe::asyncWaitForHashTable() {

VELOX_CHECK_NOT_NULL(table_);

VELOX_CHECK_NULL(lookup_);
lookup_ = std::make_unique<HashLookup>(hashers_, table_->stringAllocatorShared());
maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId);
maybeSetupInputSpiller(hashBuildResult->spillPartitionIds);
checkMaxSpillLevel(hashBuildResult->restoredPartitionId);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2006,7 +2006,7 @@ int32_t HashTable<false>::listNullKeyRows(
if (!iter->initialized) {
VELOX_CHECK_GT(nextOffset_, 0);
VELOX_CHECK_EQ(hashers_.size(), 1);
HashLookup lookup(hashers_);
HashLookup lookup(hashers_, stringAllocatorShared());
if (hashMode_ == HashMode::kHash) {
lookup.hashes.push_back(VectorHasher::kNullHash);
} else {
Expand Down Expand Up @@ -2160,7 +2160,7 @@ template class HashTable<false>;
namespace {
void populateLookupRows(
const SelectivityVector& rows,
raw_vector<vector_size_t>& lookupRows) {
HsaRawVector<vector_size_t>& lookupRows) {
if (rows.isAllSelected()) {
std::iota(lookupRows.begin(), lookupRows.end(), 0);
} else {
Expand Down
28 changes: 24 additions & 4 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,21 @@ struct TableInsertPartitionInfo {
}
};

template<typename T>
using raw_vector<
T,
AlignedStlAllocator<T, simd::kPadding>> HsaRawVector;

/// Contains input and output parameters for groupProbe and joinProbe APIs.
struct HashLookup {
explicit HashLookup(const std::vector<std::unique_ptr<VectorHasher>>& h)
: hashers(h) {}
HashLookup(
const std::vector<std::unique_ptr<VectorHasher>>& h,
const std::shared_ptr<HashStringAllocator>& hsa)
: hashers(h),
hsa_(hsa),
rows(AlignedStlAllocator<vector_size_t, simd::kPadding>(hsa_.get())) {
std::cout << "HSA use count = " << hsa_.use_count() << std::endl;
}

void reset(vector_size_t size) {
rows.resize(size);
Expand All @@ -71,10 +82,13 @@ struct HashLookup {
/// Scratch memory used to call VectorHasher::lookupValueIds.
VectorHasher::ScratchMemory scratchMemory;

/// Keep a reference to the storage for the raw_vectors.
std::shared_ptr<HashStringAllocator> hsa_;

/// Input to groupProbe and joinProbe APIs.

/// Set of row numbers of row to probe.
raw_vector<vector_size_t> rows;
HsaRawVector<vector_size_t> rows;

/// Hashes or value IDs for rows in 'rows'. Not aligned with 'rows'. Index is
/// the row number.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are bunch more raw_vector below this in HashLookup (hashes, hits, newGroups, normalizedKeys). All these are also proportional to the number of input rows in addInput(). We should use the HashStringAllocator for them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I started with one. More changes to specify the templated type in various places would be needed. I had started on this after pushing this PR. Will continue making the changes and try them out.

Expand Down Expand Up @@ -163,7 +177,7 @@ class BaseHashTable {
/// fixed sized.
const uint64_t fixedSizeListColumnsSizeSum{0};

const raw_vector<vector_size_t>* rows{nullptr};
const HsaRawVector<vector_size_t>* rows{nullptr};
const raw_vector<char*>* hits{nullptr};

vector_size_t lastRowIndex{0};
Expand Down Expand Up @@ -199,6 +213,8 @@ class BaseHashTable {

virtual HashStringAllocator* stringAllocator() = 0;

virtual const std::shared_ptr<HashStringAllocator>& stringAllocatorShared() = 0;

/// Populates 'hashes' and 'rows' fields in 'lookup' in preparation for
/// 'groupProbe' call. Rehashes the table if necessary. Uses lookup.hashes to
/// decode grouping keys from 'input'. If 'ignoreNullKeys' is true, updates
Expand Down Expand Up @@ -541,6 +557,10 @@ class HashTable : public BaseHashTable {
return &rows_->stringAllocator();
}

const std::shared_ptr<HashStringAllocator>& stringAllocatorShared() override {
return rows_->stringAllocatorShared();
}

uint64_t capacity() const override {
return capacity_;
}
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ RowNumber::RowNumber(
false, // hasProbedFlag
0, // minTableSizeForParallelJoinBuild
pool());
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(
table_->hashers(), table_->stringAllocatorShared());

const auto numRowsColumn = table_->rows()->columnAt(numKeys);
numRowsOffset_ = numRowsColumn.offset();
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ TopNRowNumber::TopNRowNumber(
0, // minTableSizeForParallelJoinBuild
pool());
partitionOffset_ = table_->rows()->columnAt(numKeys).offset();
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(
table_->hashers(), table_->stringAllocatorShared());
} else {
allocator_ = std::make_unique<HashStringAllocator>(pool());
singlePartition_ = std::make_unique<TopRows>(allocator_.get(), comparator_);
Expand Down
21 changes: 14 additions & 7 deletions velox/exec/tests/HashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ class HashTableTest : public testing::TestWithParam<bool>,
int32_t sequence = 0;
std::vector<RowVectorPtr> batches;
auto table = createHashTableForAggregation(tableType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(
table->hashers(), table->stringAllocatorShared());
std::vector<char*> allInserted;
int32_t numErased = 0;
// We insert 1000 and delete 500.
Expand Down Expand Up @@ -455,7 +456,8 @@ class HashTableTest : public testing::TestWithParam<bool>,
}

void testProbe() {
auto lookup = std::make_unique<HashLookup>(topTable_->hashers());
auto lookup = std::make_unique<HashLookup>(
topTable_->hashers(), topTable_->stringAllocatorShared());
const auto batchSize = batches_[0]->size();
SelectivityVector rows(batchSize);
const auto mode = topTable_->hashMode();
Expand Down Expand Up @@ -695,7 +697,8 @@ TEST_P(HashTableTest, bestWithReserveOverflow) {
ROW({"a", "b", "c", "d"}, {BIGINT(), BIGINT(), BIGINT(), BIGINT()});
const auto numKeys = 4;
auto table = createHashTableForAggregation(rowType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocatorShared());

// Make sure rangesWithReserve overflows.
// Ranges for keys are: 200K, 200K, 200K, 100K.
Expand Down Expand Up @@ -756,7 +759,8 @@ TEST_P(HashTableTest, bestWithReserveOverflow) {
TEST_P(HashTableTest, enableRangeWhereCan) {
auto rowType = ROW({"a", "b", "c"}, {BIGINT(), VARCHAR(), VARCHAR()});
auto table = createHashTableForAggregation(rowType, 3);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocatorShared());

// Generate 3 keys with the following ranges and number of distinct values
// (ndv):
Expand Down Expand Up @@ -795,7 +799,8 @@ TEST_P(HashTableTest, enableRangeWhereCan) {

TEST_P(HashTableTest, arrayProbeNormalizedKey) {
auto table = createHashTableForAggregation(ROW({"a"}, {BIGINT()}), 1);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocatorShared());

for (auto i = 0; i < 200; ++i) {
auto data = makeRowVector({
Expand Down Expand Up @@ -950,7 +955,8 @@ TEST_P(HashTableTest, groupBySpill) {
TEST_P(HashTableTest, checkSizeValidation) {
auto rowType = ROW({"a"}, {BIGINT()});
auto table = createHashTableForAggregation(rowType, 1);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocatorShared());
auto testHelper = HashTableTestHelper<false>::create(table.get());

// The initial set hash mode with table size of 256K entries.
Expand Down Expand Up @@ -1087,7 +1093,8 @@ TEST_P(HashTableTest, offsetOverflowLoadTags) {
auto rowType = ROW({"a"}, {BIGINT()});
auto table = createHashTableForAggregation(rowType, rowType->size());
table->hashMode();
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup =
std::make_unique<HashLookup>(table->hashers(), table->stringAllocatorShared());
auto batchSize = 1 << 25;
for (auto i = 0; i < 64; ++i) {
std::vector<RowVectorPtr> batches;
Expand Down
Loading