Skip to content

Commit

Permalink
Add more size classes
Browse files Browse the repository at this point in the history
Experiment to see dynamics with 2, 4, 8, 16 MB classes added.
  • Loading branch information
Orri Erling committed Jun 11, 2024
1 parent 65bd5de commit 7390b9d
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 4 deletions.
1 change: 1 addition & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ std::shared_ptr<MemoryAllocator> createAllocator(
if (options.useMmapAllocator) {
MmapAllocator::Options mmapOptions;
mmapOptions.capacity = options.allocatorCapacity;
mmapOptions.largestSizeClass = options.largestSizeClassPages;
mmapOptions.useMmapArena = options.useMmapArena;
mmapOptions.mmapArenaCapacityRatio = options.mmapArenaCapacityRatio;
return std::make_shared<MmapAllocator>(mmapOptions);
Expand Down
3 changes: 3 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ struct MemoryManagerOptions {
/// std::malloc.
bool useMmapAllocator{false};

// Number of pages in largest size class in MmapAllocator.
int32_t largestSizeClassPages{256};

/// If true, allocations larger than largest size class size will be delegated
/// to ManagedMmapArena. Otherwise a system mmap call will be issued for each
/// such allocation.
Expand Down
74 changes: 74 additions & 0 deletions velox/common/memory/MemoryAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/memory/MallocAllocator.h"

#include <sys/mman.h>
#include <sys/resource.h>
#include <iostream>
#include <numeric>

Expand All @@ -28,6 +29,17 @@ DECLARE_bool(velox_memory_use_hugepages);

namespace facebook::velox::memory {

// static
std::vector<MachinePageCount> makeSizeClassSizes(MachinePageCount largest) {
VELOX_CHECK_LE(256, largest);
VELOX_CHECK_EQ(largest, bits::nextPowerOfTwo(largest));
std::vector<MachinePageCount> sizes;
for (auto size = 1; size <= largest; size *= 2) {
sizes.push_back(size);
}
return sizes;
}

namespace {
std::string& cacheFailureMessage() {
thread_local std::string message;
Expand Down Expand Up @@ -436,4 +448,66 @@ std::string MemoryAllocator::getAndClearFailureMessage() {
}
return allocatorErrMsg;
}

namespace {
struct TraceState {
struct rusage rusage;
Stats allocatorStats;
int64_t ioTotal;
struct timeval tv;
};

int64_t toUsec(struct timeval tv) {
return tv.tv_sec * 1000000LL + tv.tv_usec;
}

int32_t elapsedUsec(struct timeval end, struct timeval begin) {
return toUsec(end) - toUsec(begin);
}
} // namespace

void MemoryAllocator::getTracingFuncs(
std::function<void()>& init,
std::function<std::string()>& report,
std::function<int64_t()> ioVolume) {
auto allocator = shared_from_this();
auto state = std::make_shared<TraceState>();
init = [state, allocator, ioVolume]() {
getrusage(RUSAGE_SELF, &state->rusage);
struct timezone tz;
gettimeofday(&state->tv, &tz);
state->allocatorStats = allocator->stats();
state->ioTotal = ioVolume ? ioVolume() : 0;
};
report = [state, allocator, ioVolume]() -> std::string {
struct rusage rusage;
getrusage(RUSAGE_SELF, &rusage);
auto newStats = allocator->stats();
float u = elapsedUsec(rusage.ru_utime, state->rusage.ru_utime);
float s = elapsedUsec(rusage.ru_stime, state->rusage.ru_stime);
auto m = allocator->stats() - state->allocatorStats;
float flt = rusage.ru_minflt - state->rusage.ru_minflt;
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
float elapsed = elapsedUsec(tv, state->tv);
int64_t io = 0;
if (ioVolume) {
io = ioVolume() - state->ioTotal;
}
std::stringstream out;
out << std::endl
<< std::endl
<< fmt::format(
"user%={} sys%={} minflt/s={}, io={} MB/s\n",
100 * u / elapsed,
100 * s / elapsed,
flt / (elapsed / 1000000),
io / (elapsed));
out << m.toString() << std::endl;
out << allocator->toString() << std::endl;
return out.str();
};
}

} // namespace facebook::velox::memory
11 changes: 10 additions & 1 deletion velox/common/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,17 @@ class MemoryAllocator : public std::enable_shared_from_this<MemoryAllocator> {
/// thread. The message is cleared after return.
std::string getAndClearFailureMessage();

void getTracingFuncs(
std::function<void()>& init,
std::function<std::string()>& report,
std::function<int64_t()> ioVolume = nullptr);

protected:
explicit MemoryAllocator() = default;
MemoryAllocator(MachinePageCount largestSizeClassPages = 256)
: sizeClassSizes_(makeSizeClassSizes(largestSizeClassPages)) {}

static std::vector<MachinePageCount> makeSizeClassSizes(
MachinePageCount largest);

/// Represents a mix of blocks of different sizes for covering a single
/// allocation.
Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/MmapAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace facebook::velox::memory {
MmapAllocator::MmapAllocator(const Options& options)
: kind_(MemoryAllocator::Kind::kMmap),
: MemoryAllocator(options.largestSizeClass),
kind_(MemoryAllocator::Kind::kMmap),
useMmapArena_(options.useMmapArena),
maxMallocBytes_(options.maxMallocBytes),
mallocReservedBytes_(
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/MmapAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class MmapAllocator : public MemoryAllocator {
/// Capacity in bytes, default unlimited.
uint64_t capacity{kMaxMemory};

int32_t largestSizeClass{256};

/// If set true, allocations larger than largest size class size will be
/// delegated to ManagedMmapArena. Otherwise a system mmap call will be
/// issued for each such allocation.
Expand Down
16 changes: 15 additions & 1 deletion velox/common/process/Profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ tsan_atomic<bool> Profiler::shouldSaveResult_;
int64_t Profiler::sampleStartTime_;
int64_t Profiler::cpuAtSampleStart_;
int64_t Profiler::cpuAtLastCheck_;
std::function<void()> Profiler::startExtra_;
std::function<std::string()> Profiler::extraReport_;

namespace {
std::string hostname;
Expand Down Expand Up @@ -158,6 +160,10 @@ void Profiler::copyToResult(const std::string* data) {
timeString(now),
100 * (cpu - cpuAtSampleStart_) / std::max<int64_t>(1, elapsed)));
out->append(std::string_view(buffer, resultSize));
if (extraReport_) {
std::string extra = extraReport_();
out->append(std::string_view(extra.data(), extra.size()));
}
out->flush();
LOG(INFO) << "PROFILE: Produced result " << target << " " << resultSize
<< " bytes";
Expand All @@ -177,6 +183,9 @@ void Profiler::makeProfileDir(std::string path) {
}

std::thread Profiler::startSample() {
if (startExtra_) {
startExtra_();
}
std::thread thread([&]() {
// We run perf under a shell because running it with fork + rexec
// and killing it with SIGINT produces a corrupt perf.data
Expand Down Expand Up @@ -295,12 +304,17 @@ bool Profiler::isRunning() {
return profileStarted_;
}

void Profiler::start(const std::string& path) {
void Profiler::start(
const std::string& path,
std::function<void()> extraStart,
std::function<std::string()> extraReport) {
{
#if !defined(linux)
VELOX_FAIL("Profiler is only available for Linux");
#endif
resultPath_ = path;
startExtra_ = extraStart;
extraReport_ = extraReport;
std::lock_guard<std::mutex> l(profileMutex_);
if (profileStarted_) {
return;
Expand Down
8 changes: 7 additions & 1 deletion velox/common/process/Profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ namespace facebook::velox::process {
class Profiler {
public:
/// Starts periodic production of perf reports.
static void start(const std::string& path);
static void start(
const std::string& path,
std::function<void()> extraStart = nullptr,
std::function<std::string()> extraReport = nullptr);

// Stops profiling background associated threads. Threads are stopped on
// return.
Expand Down Expand Up @@ -69,6 +72,9 @@ class Profiler {

// CPU time at last periodic check.
static int64_t cpuAtLastCheck_;

static std::function<void()> startExtra_;
static std::function<std::string()> extraReport_;
};

} // namespace facebook::velox::process

0 comments on commit 7390b9d

Please sign in to comment.