Skip to content

Commit

Permalink
Add global access utility to PeriodicStatsReporter (#9696)
Browse files Browse the repository at this point in the history
Summary:
We only need 1 PeriodicStatsReporter instance globally. Adding utility to achieve global singleton.

Pull Request resolved: #9696

Reviewed By: xiaoxmeng

Differential Revision: D56918350

Pulled By: tanjialiang

fbshipit-source-id: 55505d468bec300e1a0d840956813451b9c86f05
  • Loading branch information
tanjialiang authored and facebook-github-bot committed May 3, 2024
1 parent 238830b commit 5bc6818
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
34 changes: 30 additions & 4 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,38 @@ namespace {
if ((counter) != 0) { \
RECORD_METRIC_VALUE((name), (counter)); \
}

std::mutex& instanceMutex() {
static std::mutex instanceMu;
return instanceMu;
}

// Global instance. Must be called while holding a lock over instanceMutex().
std::unique_ptr<PeriodicStatsReporter>& instance() {
static std::unique_ptr<PeriodicStatsReporter> reporter;
return reporter;
}
} // namespace

PeriodicStatsReporter::PeriodicStatsReporter(
const velox::memory::MemoryArbitrator* arbitrator,
const Options& options)
: arbitrator_(arbitrator), options_(options) {}
void startPeriodicStatsReporter(const PeriodicStatsReporter::Options& options) {
std::lock_guard<std::mutex> l(instanceMutex());
auto& instanceRef = instance();
VELOX_CHECK_NULL(
instanceRef, "The periodic stats reporter has already started.");
instanceRef = std::make_unique<PeriodicStatsReporter>(options);
instanceRef->start();
}

void stopPeriodicStatsReporter() {
std::lock_guard<std::mutex> l(instanceMutex());
auto& instanceRef = instance();
VELOX_CHECK_NOT_NULL(instanceRef, "No periodic stats reporter to stop.");
instanceRef->stop();
instanceRef.reset();
}

PeriodicStatsReporter::PeriodicStatsReporter(const Options& options)
: arbitrator_(options.arbitrator), options_(options) {}

void PeriodicStatsReporter::start() {
LOG(INFO) << "Starting PeriodicStatsReporter with options "
Expand Down
15 changes: 12 additions & 3 deletions velox/common/base/PeriodicStatsReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class PeriodicStatsReporter {
struct Options {
Options() {}

const memory::MemoryArbitrator* arbitrator{nullptr};

uint64_t arbitratorStatsIntervalMs{60'000};

std::string toString() const {
Expand All @@ -47,9 +49,7 @@ class PeriodicStatsReporter {
}
};

PeriodicStatsReporter(
const velox::memory::MemoryArbitrator* arbitrator,
const Options& options = Options());
PeriodicStatsReporter(const Options& options = Options());

/// Invoked to start the report daemon in background.
void start();
Expand Down Expand Up @@ -83,4 +83,13 @@ class PeriodicStatsReporter {

folly::ThreadedRepeatingFunctionRunner scheduler_;
};

/// Initializes and starts the process-wide periodic stats reporter. Before
/// 'stopPeriodicStatsReporter()' is called, this method can only be called once
/// process-wide, and additional calls to this method will throw.
void startPeriodicStatsReporter(const PeriodicStatsReporter::Options& options);

/// Stops the process-wide periodic stats reporter.
void stopPeriodicStatsReporter();

} // namespace facebook::velox
18 changes: 17 additions & 1 deletion velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <unordered_set>
#include "velox/common/base/Counters.h"
#include "velox/common/base/PeriodicStatsReporter.h"
#include "velox/common/base/tests/GTestUtils.h"

namespace facebook::velox {

Expand Down Expand Up @@ -196,8 +197,9 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator {
TEST_F(PeriodicStatsReporterTest, basic) {
TestStatsReportMemoryArbitrator arbitrator({});
PeriodicStatsReporter::Options options;
options.arbitrator = &arbitrator;
options.arbitratorStatsIntervalMs = 4'000;
PeriodicStatsReporter periodicReporter(&arbitrator, options);
PeriodicStatsReporter periodicReporter(options);

periodicReporter.start();
std::this_thread::sleep_for(std::chrono::milliseconds(2'000));
Expand All @@ -212,6 +214,20 @@ TEST_F(PeriodicStatsReporterTest, basic) {
counterMap.count(kMetricArbitratorFreeReservedCapacityBytes.str()), 1);
}

TEST_F(PeriodicStatsReporterTest, globalInstance) {
TestStatsReportMemoryArbitrator arbitrator({});
PeriodicStatsReporter::Options options;
options.arbitrator = &arbitrator;
options.arbitratorStatsIntervalMs = 4'000;
VELOX_ASSERT_THROW(
stopPeriodicStatsReporter(), "No periodic stats reporter to stop.");
ASSERT_NO_THROW(startPeriodicStatsReporter(options));
VELOX_ASSERT_THROW(
startPeriodicStatsReporter(options),
"The periodic stats reporter has already started.");
ASSERT_NO_THROW(stopPeriodicStatsReporter());
}

// Registering to folly Singleton with intended reporter type
folly::Singleton<BaseStatsReporter> reporter([]() {
return new TestReporter();
Expand Down

0 comments on commit 5bc6818

Please sign in to comment.