Skip to content

Commit

Permalink
Add metric snap_blocking_time_ms
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed May 9, 2024
1 parent 031e35e commit 87af8f3
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 64 deletions.
2 changes: 0 additions & 2 deletions src/Service/FourLetterCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ String MonitorCommand::run()
print(ret, "watch_count", state_machine.getTotalWatchesCount());
print(ret, "ephemerals_count", state_machine.getTotalEphemeralNodesCount());
print(ret, "approximate_data_size", state_machine.getApproximateDataSize());
print(ret, "snap_count", state_machine.getSnapshotCount());
print(ret, "snap_time_ms", state_machine.getSnapshotTimeMs());
print(ret, "in_snapshot", state_machine.getSnapshoting());

#if defined(__linux__) || defined(__APPLE__)
Expand Down
17 changes: 15 additions & 2 deletions src/Service/Metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ Strings AdvanceSummary::values() const
return results;
}

Strings SimpleSummary::values() const
{
Strings results;
results.emplace_back(fmt::format("zk_{}\t{}", name, sum.load()));
return results;
}

void BasicSummary::add(RK::UInt64 value)
{
UInt64 current;
Expand Down Expand Up @@ -117,17 +124,23 @@ Metrics::Metrics()
apply_read_request_time_ms = getSummary("apply_read_request_time_ms", SummaryLevel::ADVANCED);
read_latency = getSummary("readlatency", SummaryLevel::ADVANCED);
update_latency = getSummary("updatelatency", SummaryLevel::ADVANCED);

snap_time_ms = getSummary("snap_time_ms", SummaryLevel::SIMPLE);
snap_blocking_time_ms = getSummary("snap_blocking_time_ms", SummaryLevel::SIMPLE);
snap_count = getSummary("snap_count", SummaryLevel::SIMPLE);
}

SummaryPtr Metrics::getSummary(const RK::String & name, RK::SummaryLevel detailLevel)
SummaryPtr Metrics::getSummary(const RK::String & name, RK::SummaryLevel level)
{
SummaryPtr summary;

if (summaries.contains(name))
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Already registered summary {} ", name);

if (detailLevel == SummaryLevel::BASIC)
if (level == SummaryLevel::SIMPLE)
summary = std::make_shared<SimpleSummary>(name);
else if (level == SummaryLevel::BASIC)
summary = std::make_shared<BasicSummary>(name);
else
summary = std::make_shared<AdvanceSummary>(name);
Expand Down
85 changes: 56 additions & 29 deletions src/Service/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,55 +62,47 @@ class Summary
enum SummaryLevel
{
/**
* The returned Summary is expected to track only simple aggregated
* values, like min/max/avg
*/
* The returned Summary is expected to track only sum of values
*/
SIMPLE,

/**
* The returned Summary is expected to track only simple aggregated
* values, like min/max/avg
*/
BASIC,

/**
* It is expected that the returned Summary performs expensive
* aggregations, like percentiles.
*/
* It is expected that the returned Summary performs expensive
* aggregations, like percentiles.
*/
ADVANCED
};


class AdvanceSummary : public Summary
class SimpleSummary : public Summary
{
public:
AdvanceSummary(const String & name_): name(name_)
{
}

void reset() override
explicit SimpleSummary(const String & name_): name(name_)
{
count.store(0);
sum.store(0);
reservoir_sampler.reset();
}

void add(UInt64 value) override
{
count ++;
sum += value;
reservoir_sampler.update(value);
}
Strings values() const override;

static double getValue(const std::vector<UInt64>& numbers, double quantile);
void add(RK::UInt64 value) override { sum += value; }
UInt64 getSum() const { return sum.load(); }

Strings values() const override;
void reset() override { sum.store(0); }

private:
String name;
ReservoirSampler reservoir_sampler;
std::atomic<UInt64> count{0};
std::atomic<UInt64> sum{0};
};


class BasicSummary : public Summary
{
public:
BasicSummary(const String & name_): name(name_)
explicit BasicSummary(const String & name_): name(name_)
{
}

Expand Down Expand Up @@ -148,6 +140,38 @@ class BasicSummary : public Summary
std::atomic<UInt64> max{0};
};

class AdvanceSummary : public Summary
{
public:
explicit AdvanceSummary(const String & name_): name(name_)
{
}

void reset() override
{
count.store(0);
sum.store(0);
reservoir_sampler.reset();
}

void add(UInt64 value) override
{
count ++;
sum += value;
reservoir_sampler.update(value);
}

static double getValue(const std::vector<UInt64>& numbers, double quantile);

Strings values() const override;

private:
String name;
ReservoirSampler reservoir_sampler;
std::atomic<UInt64> count{0};
std::atomic<UInt64> sum{0};
};

/** Implements Summary Metrics for RK.
* There is possible race-condition, but we don't need the stats to be extremely accurate.
*/
Expand All @@ -156,7 +180,7 @@ class Metrics
public:
using SummaryPtr = std::shared_ptr<Summary>;

static Metrics& getMetrics()
static Metrics & getMetrics()
{
static Metrics metrics;
return metrics;
Expand All @@ -176,10 +200,13 @@ class Metrics
SummaryPtr apply_read_request_time_ms;
SummaryPtr read_latency;
SummaryPtr update_latency;
SummaryPtr snap_time_ms;
SummaryPtr snap_blocking_time_ms;
SummaryPtr snap_count;

private:
Metrics();
SummaryPtr getSummary(const String & name, SummaryLevel detailLevel);
SummaryPtr getSummary(const String & name, SummaryLevel level);

std::map<String, SummaryPtr> summaries;
};
Expand Down
2 changes: 2 additions & 0 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Service/SnapshotCommon.h>
#include <Service/Metrics.h>
#include <Common/Stopwatch.h>


Expand Down Expand Up @@ -36,6 +37,7 @@ struct SnapTask
Stopwatch watch;
buckets_nodes = store.dumpDataTree();
LOG_INFO(log, "Dump dataTree costs {}ms", watch.elapsedMilliseconds());
Metrics::getMetrics().snap_blocking_time_ms->add(watch.elapsedMilliseconds());

for (auto && bucket : *buckets_nodes)
{
Expand Down
21 changes: 8 additions & 13 deletions src/Service/NuRaftStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ void NuRaftStateMachine::snapThread()
{
if (snap_task_ready)
{
Stopwatch stopwatch;
auto current_task = std::move(snap_task);
snap_task_ready = false;

Expand All @@ -165,15 +164,13 @@ void NuRaftStateMachine::snapThread()

current_task->when_done(ret, except);

stopwatch.stop();

snap_count.fetch_add(1);
snap_time_ms.fetch_add(stopwatch.elapsedMilliseconds());
Metrics::getMetrics().snap_count->add(1);
Metrics::getMetrics().snap_time_ms->add(Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time);

last_snapshot_time = Poco::Timestamp().epochMicroseconds();
in_snapshot = false;

LOG_INFO(log, "Create snapshot time cost {} ms", stopwatch.elapsedMilliseconds());
LOG_INFO(log, "Create snapshot time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Expand Down Expand Up @@ -432,8 +429,8 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result<bool>::handl
}
}

Stopwatch stopwatch;
in_snapshot = true;
snap_start_time = Poco::Timestamp().epochMicroseconds() / 1000;

LOG_INFO(log, "Creating snapshot last_log_term {}, last_log_idx {}", s.get_last_log_term(), s.get_last_log_idx());

Expand All @@ -444,15 +441,13 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result<bool>::handl
bool ret = true;
when_done(ret, except);

stopwatch.stop();

snap_count.fetch_add(1);
snap_time_ms.fetch_add(stopwatch.elapsedMilliseconds());
Metrics::getMetrics().snap_count->add(1);
Metrics::getMetrics().snap_time_ms->add(Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time);

last_snapshot_time = Poco::Timestamp().epochMicroseconds();
in_snapshot = false;

LOG_INFO(log, "Created snapshot, time cost {} us", stopwatch.elapsedMicroseconds());
LOG_INFO(log, "Created snapshot, time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time);
}
else
{
Expand All @@ -462,7 +457,7 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result<bool>::handl
snap_task = std::make_shared<SnapTask>(snap_copy, store, when_done);
snap_task_ready = true;

LOG_INFO(log, "Scheduling asynchronous creating snapshot task, time cost {} us", stopwatch.elapsedMicroseconds());
LOG_INFO(log, "Scheduling asynchronous creating snapshot task, time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time);
}
}

Expand Down
19 changes: 1 addition & 18 deletions src/Service/NuRaftStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,18 +271,6 @@ class NuRaftStateMachine : public nuraft::state_machine
/// and follower only contains local session.
bool containsSession(int64_t session_id) const;

/// how many snapshot created in process life time.
uint64_t getSnapshotCount() const
{
return snap_count;
}

/// how many times cost byt snapshot creating in process life time.
uint64_t getSnapshotTimeMs() const
{
return snap_time_ms;
}

/// whether a snapshot creating is in progress.
bool getSnapshoting() const
{
Expand Down Expand Up @@ -344,15 +332,10 @@ class NuRaftStateMachine : public nuraft::state_machine
/// When get a not exist node, return blank.
KeeperNode default_node;

/// how many snapshot created in process life time.
std::atomic_int64_t snap_count{0};

/// how many times cost byt snapshot creating in process life time.
std::atomic_int64_t snap_time_ms{0};

/// whether a snapshot creating is in progress.
std::atomic_bool in_snapshot = false;
std::atomic_bool snap_task_ready{false};
std::atomic_uint64_t snap_start_time;

ThreadFromGlobalPool snap_thread;

Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_four_word_command/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/logs_conf.xml'],
stay_alive=True)

simple_metrics = ["snap_time_ms", "snap_blocking_time_ms", "snap_count"]
basic_metrics = ["log_replication_batch_size"]
advance_metrics = ["apply_read_request_time_ms", "apply_write_request_time_ms", "push_request_queue_time_ms", "readlatency", "updatelatency", ]

Expand Down Expand Up @@ -194,6 +195,9 @@ def test_cmd_mntr(started_cluster):
assert int(result["zk_packets_sent"]) >= 31
assert int(result["zk_packets_received"]) >= 31

for metric in simple_metrics:
assert (f'zk_{metric}' in result)

for metric in basic_metrics:
assert (f'zk_avg_{metric}' in result)
assert (f'zk_min_{metric}' in result)
Expand Down

0 comments on commit 87af8f3

Please sign in to comment.