diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 68c3fbc5c5ec..af6c0193fc55 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -49,6 +49,7 @@ std::shared_ptr createAllocator( if (options.useMmapAllocator) { MmapAllocator::Options mmapOptions; mmapOptions.capacity = options.allocatorCapacity; + mmapOptions.largestSizeClass = options.largestSizeClassPages; mmapOptions.useMmapArena = options.useMmapArena; mmapOptions.mmapArenaCapacityRatio = options.mmapArenaCapacityRatio; return std::make_shared(mmapOptions); diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index c40de3933324..fa5c5cec0814 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -92,6 +92,9 @@ struct MemoryManagerOptions { /// std::malloc. bool useMmapAllocator{false}; + // Number of pages in largest size class in MmapAllocator. + int32_t largestSizeClassPages{256}; + /// If true, allocations larger than largest size class size will be delegated /// to ManagedMmapArena. Otherwise a system mmap call will be issued for each /// such allocation. diff --git a/velox/common/memory/MemoryAllocator.cpp b/velox/common/memory/MemoryAllocator.cpp index e4dd46457b3a..448e9305207d 100644 --- a/velox/common/memory/MemoryAllocator.cpp +++ b/velox/common/memory/MemoryAllocator.cpp @@ -18,6 +18,7 @@ #include "velox/common/memory/MallocAllocator.h" #include +#include #include #include @@ -28,6 +29,17 @@ DECLARE_bool(velox_memory_use_hugepages); namespace facebook::velox::memory { +// static +std::vector makeSizeClassSizes(MachinePageCount largest) { + VELOX_CHECK_LE(256, largest); + VELOX_CHECK_EQ(largest, bits::nextPowerOfTwo(largest)); + std::vector sizes; + for (auto size = 1; size <= largest; size *= 2) { + sizes.push_back(size); + } + return sizes; +} + namespace { std::string& cacheFailureMessage() { thread_local std::string message; @@ -436,4 +448,66 @@ std::string MemoryAllocator::getAndClearFailureMessage() { } return allocatorErrMsg; } + +namespace { +struct TraceState { + struct rusage rusage; + Stats allocatorStats; + int64_t ioTotal; + struct timeval tv; +}; + +int64_t toUsec(struct timeval tv) { + return tv.tv_sec * 1000000LL + tv.tv_usec; +} + +int32_t elapsedUsec(struct timeval end, struct timeval begin) { + return toUsec(end) - toUsec(begin); +} +} // namespace + +void MemoryAllocator::getTracingFuncs( + std::function& init, + std::function& report, + std::function ioVolume) { + auto allocator = shared_from_this(); + auto state = std::make_shared(); + init = [state, allocator, ioVolume]() { + getrusage(RUSAGE_SELF, &state->rusage); + struct timezone tz; + gettimeofday(&state->tv, &tz); + state->allocatorStats = allocator->stats(); + state->ioTotal = ioVolume ? ioVolume() : 0; + }; + report = [state, allocator, ioVolume]() -> std::string { + struct rusage rusage; + getrusage(RUSAGE_SELF, &rusage); + auto newStats = allocator->stats(); + float u = elapsedUsec(rusage.ru_utime, state->rusage.ru_utime); + float s = elapsedUsec(rusage.ru_stime, state->rusage.ru_stime); + auto m = allocator->stats() - state->allocatorStats; + float flt = rusage.ru_minflt - state->rusage.ru_minflt; + struct timeval tv; + struct timezone tz; + gettimeofday(&tv, &tz); + float elapsed = elapsedUsec(tv, state->tv); + int64_t io = 0; + if (ioVolume) { + io = ioVolume() - state->ioTotal; + } + std::stringstream out; + out << std::endl + << std::endl + << fmt::format( + "user%={} sys%={} minflt/s={}, io={} MB/s\n", + 100 * u / elapsed, + 100 * s / elapsed, + flt / (elapsed / 1000000), + io / (elapsed)); + out << m.toString() << std::endl; + out << allocator->toString() << std::endl; + return out.str(); + }; +} + } // namespace facebook::velox::memory diff --git a/velox/common/memory/MemoryAllocator.h b/velox/common/memory/MemoryAllocator.h index 41508b63aad0..9eab2da71ac1 100644 --- a/velox/common/memory/MemoryAllocator.h +++ b/velox/common/memory/MemoryAllocator.h @@ -400,8 +400,17 @@ class MemoryAllocator : public std::enable_shared_from_this { /// thread. The message is cleared after return. std::string getAndClearFailureMessage(); + void getTracingFuncs( + std::function& init, + std::function& report, + std::function ioVolume = nullptr); + protected: - explicit MemoryAllocator() = default; + MemoryAllocator(MachinePageCount largestSizeClassPages = 256) + : sizeClassSizes_(makeSizeClassSizes(largestSizeClassPages)) {} + + static std::vector makeSizeClassSizes( + MachinePageCount largest); /// Represents a mix of blocks of different sizes for covering a single /// allocation. diff --git a/velox/common/memory/MmapAllocator.cpp b/velox/common/memory/MmapAllocator.cpp index 2a4f26174ab2..8ab440e21252 100644 --- a/velox/common/memory/MmapAllocator.cpp +++ b/velox/common/memory/MmapAllocator.cpp @@ -25,7 +25,8 @@ namespace facebook::velox::memory { MmapAllocator::MmapAllocator(const Options& options) - : kind_(MemoryAllocator::Kind::kMmap), + : MemoryAllocator(options.largestSizeClass), + kind_(MemoryAllocator::Kind::kMmap), useMmapArena_(options.useMmapArena), maxMallocBytes_(options.maxMallocBytes), mallocReservedBytes_( diff --git a/velox/common/memory/MmapAllocator.h b/velox/common/memory/MmapAllocator.h index 9013817c36a4..fd17ee8fa765 100644 --- a/velox/common/memory/MmapAllocator.h +++ b/velox/common/memory/MmapAllocator.h @@ -55,6 +55,8 @@ class MmapAllocator : public MemoryAllocator { /// Capacity in bytes, default unlimited. uint64_t capacity{kMaxMemory}; + int32_t largestSizeClass{256}; + /// If set true, allocations larger than largest size class size will be /// delegated to ManagedMmapArena. Otherwise a system mmap call will be /// issued for each such allocation. diff --git a/velox/common/process/Profiler.cpp b/velox/common/process/Profiler.cpp index 7575b6417071..e8e31202ee80 100644 --- a/velox/common/process/Profiler.cpp +++ b/velox/common/process/Profiler.cpp @@ -66,6 +66,8 @@ tsan_atomic Profiler::shouldSaveResult_; int64_t Profiler::sampleStartTime_; int64_t Profiler::cpuAtSampleStart_; int64_t Profiler::cpuAtLastCheck_; +std::function Profiler::startExtra_; +std::function Profiler::extraReport_; namespace { std::string hostname; @@ -158,6 +160,10 @@ void Profiler::copyToResult(const std::string* data) { timeString(now), 100 * (cpu - cpuAtSampleStart_) / std::max(1, elapsed))); out->append(std::string_view(buffer, resultSize)); + if (extraReport_) { + std::string extra = extraReport_(); + out->append(std::string_view(extra.data(), extra.size())); + } out->flush(); LOG(INFO) << "PROFILE: Produced result " << target << " " << resultSize << " bytes"; @@ -177,6 +183,9 @@ void Profiler::makeProfileDir(std::string path) { } std::thread Profiler::startSample() { + if (startExtra_) { + startExtra_(); + } std::thread thread([&]() { // We run perf under a shell because running it with fork + rexec // and killing it with SIGINT produces a corrupt perf.data @@ -295,12 +304,17 @@ bool Profiler::isRunning() { return profileStarted_; } -void Profiler::start(const std::string& path) { +void Profiler::start( + const std::string& path, + std::function extraStart, + std::function extraReport) { { #if !defined(linux) VELOX_FAIL("Profiler is only available for Linux"); #endif resultPath_ = path; + startExtra_ = extraStart; + extraReport_ = extraReport; std::lock_guard l(profileMutex_); if (profileStarted_) { return; diff --git a/velox/common/process/Profiler.h b/velox/common/process/Profiler.h index 48309c41e28c..425f56bc8618 100644 --- a/velox/common/process/Profiler.h +++ b/velox/common/process/Profiler.h @@ -29,7 +29,10 @@ namespace facebook::velox::process { class Profiler { public: /// Starts periodic production of perf reports. - static void start(const std::string& path); + static void start( + const std::string& path, + std::function extraStart = nullptr, + std::function extraReport = nullptr); // Stops profiling background associated threads. Threads are stopped on // return. @@ -69,6 +72,9 @@ class Profiler { // CPU time at last periodic check. static int64_t cpuAtLastCheck_; + + static std::function startExtra_; + static std::function extraReport_; }; } // namespace facebook::velox::process