Skip to content

Commit

Permalink
[VL] Daily update velox version 08-08 (#6752)
Browse files Browse the repository at this point in the history
(cherry picked from commit ec62a76)
  • Loading branch information
jinchengchenghh authored and liuxiang71 committed Nov 12, 2024
1 parent bbd5776 commit 1aa8a56
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
SuspendedSection suspender;
velox::exec::MemoryReclaimer::Stats status;
auto* mm = memoryManager_->getMemoryManager();
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity(remaining); // this conducts spilling
LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << " bytes.";
uint64_t total = shrunken + spilledOut;
VLOG(2) << logPrefix << "Successfully reclaimed total " << total << " bytes.";
Expand Down
89 changes: 61 additions & 28 deletions cpp/velox/memory/VeloxMemoryManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,60 +35,89 @@ namespace gluten {

using namespace facebook;

namespace {

static constexpr std::string_view kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
static constexpr std::string_view kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};

template <typename T>
T getConfig(
const std::unordered_map<std::string, std::string>& configs,
const std::string_view& key,
const T& defaultValue) {
if (configs.count(std::string(key)) > 0) {
try {
return folly::to<T>(configs.at(std::string(key)));
} catch (const std::exception& e) {
VELOX_USER_FAIL("Failed while parsing SharedArbitrator configs: {}", e.what());
}
}
return defaultValue;
}
} // namespace
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
ListenableArbitrator(const Config& config, AllocationListener* listener)
: MemoryArbitrator(config), listener_(listener) {}

: MemoryArbitrator(config),
listener_(listener),
memoryPoolInitialCapacity_(
getConfig<uint64_t>(config.extraConfigs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)),
memoryPoolTransferCapacity_(
getConfig<uint64_t>(config.extraConfigs, kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) {
}
std::string kind() const override {
return kind_;
}

uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
listener_->allocationChanged(targetBytes);
if (!growPool(pool, targetBytes, 0)) {
VELOX_FAIL("Failed to grow root pool's capacity for {}", velox::succinctBytes(targetBytes));
}
return targetBytes;
void addPool(const std::shared_ptr<velox::memory::MemoryPool>& pool) override {
VELOX_CHECK_EQ(pool->capacity(), 0);

std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidates_.count(pool.get()), 0);
candidates_.emplace(pool.get(), pool->weak_from_this());
}

uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
return shrinkCapacity0(pool, targetBytes);
void removePool(velox::memory::MemoryPool* pool) override {
VELOX_CHECK_EQ(pool->reservedBytes(), 0);
shrinkCapacity(pool, pool->capacity());

std::unique_lock guard{mutex_};
const auto ret = candidates_.erase(pool);
VELOX_CHECK_EQ(ret, 1);
}

bool growCapacity(
velox::memory::MemoryPool* pool,
const std::vector<std::shared_ptr<velox::memory::MemoryPool>>& candidatePools,
uint64_t targetBytes) override {
bool growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
velox::memory::ScopedMemoryArbitrationContext ctx(pool);
std::shared_ptr<facebook::velox::memory::MemoryPool> candidate;
velox::memory::MemoryPool* pool;
{
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only be used within a single root pool")
candidate = candidatePools.back();
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool")
candidate = candidates_.begin()->first;
}
VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in ListenableArbitrator");
VELOX_CHECK(pool->root() == candidate, "Illegal state in ListenableArbitrator");

growCapacity0(pool->root(), targetBytes);
return true;
}

uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<velox::memory::MemoryPool>>& pools,
uint64_t targetBytes,
bool allowSpill,
bool allowAbort) override {
uint64_t shrinkCapacity(uint64_t targetBytes, bool allowSpill, bool allowAbort) override {
velox::memory::ScopedMemoryArbitrationContext ctx((const velox::memory::MemoryPool*)nullptr);
facebook::velox::exec::MemoryReclaimer::Stats status;
std::shared_ptr<velox::memory::MemoryPool> pool;
velox::memory::MemoryPool* pool;
{
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(pools.size(), 1, "ListenableArbitrator should only be used within a single root pool")
pool = pools.at(0);
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool")
pool = candidates_.begin()->first;
}
pool->reclaim(targetBytes, 0, status); // ignore the output
return shrinkCapacity0(pool.get(), 0);
return shrinkCapacity0(pool, 0);
}

uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
return shrinkCapacity0(pool, targetBytes);
}

Stats stats() const override {
Expand Down Expand Up @@ -130,8 +159,12 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator {
}

gluten::AllocationListener* listener_;
std::mutex mutex_;
const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
const uint64_t memoryPoolTransferCapacity_;

mutable std::mutex mutex_;
inline static std::string kind_ = "GLUTEN";
std::unordered_map<velox::memory::MemoryPool*, std::weak_ptr<velox::memory::MemoryPool>> candidates_;
};

class ArbitratorFactoryRegister {
Expand Down

0 comments on commit 1aa8a56

Please sign in to comment.