diff --git a/src/Service/FourLetterCommand.cpp b/src/Service/FourLetterCommand.cpp index 18247fc6bd..024b18dd7b 100644 --- a/src/Service/FourLetterCommand.cpp +++ b/src/Service/FourLetterCommand.cpp @@ -258,8 +258,6 @@ String MonitorCommand::run() print(ret, "watch_count", state_machine.getTotalWatchesCount()); print(ret, "ephemerals_count", state_machine.getTotalEphemeralNodesCount()); print(ret, "approximate_data_size", state_machine.getApproximateDataSize()); - print(ret, "snap_count", state_machine.getSnapshotCount()); - print(ret, "snap_time_ms", state_machine.getSnapshotTimeMs()); print(ret, "in_snapshot", state_machine.getSnapshoting()); #if defined(__linux__) || defined(__APPLE__) diff --git a/src/Service/Metrics.cpp b/src/Service/Metrics.cpp index 80f2d38200..41a2e55cc4 100644 --- a/src/Service/Metrics.cpp +++ b/src/Service/Metrics.cpp @@ -80,6 +80,13 @@ Strings AdvanceSummary::values() const return results; } +Strings SimpleSummary::values() const +{ + Strings results; + results.emplace_back(fmt::format("zk_{}\t{}", name, sum.load())); + return results; +} + void BasicSummary::add(RK::UInt64 value) { UInt64 current; @@ -117,9 +124,13 @@ Metrics::Metrics() apply_read_request_time_ms = getSummary("apply_read_request_time_ms", SummaryLevel::ADVANCED); read_latency = getSummary("readlatency", SummaryLevel::ADVANCED); update_latency = getSummary("updatelatency", SummaryLevel::ADVANCED); + + snap_time_ms = getSummary("snap_time_ms", SummaryLevel::SIMPLE); + snap_blocking_time_ms = getSummary("snap_blocking_time_ms", SummaryLevel::SIMPLE); + snap_count = getSummary("snap_count", SummaryLevel::SIMPLE); } -SummaryPtr Metrics::getSummary(const RK::String & name, RK::SummaryLevel detailLevel) +SummaryPtr Metrics::getSummary(const RK::String & name, RK::SummaryLevel level) { SummaryPtr summary; @@ -127,7 +138,9 @@ SummaryPtr Metrics::getSummary(const RK::String & name, RK::SummaryLevel detailL throw Exception( ErrorCodes::BAD_ARGUMENTS, "Already registered summary {} ", name); - if (detailLevel == SummaryLevel::BASIC) + if (level == SummaryLevel::SIMPLE) + summary = std::make_shared(name); + else if (level == SummaryLevel::BASIC) summary = std::make_shared(name); else summary = std::make_shared(name); diff --git a/src/Service/Metrics.h b/src/Service/Metrics.h index 02449e4565..005fc0322d 100644 --- a/src/Service/Metrics.h +++ b/src/Service/Metrics.h @@ -62,55 +62,47 @@ class Summary enum SummaryLevel { /** - * The returned Summary is expected to track only simple aggregated - * values, like min/max/avg - */ + * The returned Summary is expected to track only sum of values + */ + SIMPLE, + + /** + * The returned Summary is expected to track only simple aggregated + * values, like min/max/avg + */ BASIC, + /** - * It is expected that the returned Summary performs expensive - * aggregations, like percentiles. - */ + * It is expected that the returned Summary performs expensive + * aggregations, like percentiles. + */ ADVANCED }; -class AdvanceSummary : public Summary +class SimpleSummary : public Summary { public: - AdvanceSummary(const String & name_): name(name_) - { - } - - void reset() override + explicit SimpleSummary(const String & name_): name(name_) { - count.store(0); - sum.store(0); - reservoir_sampler.reset(); } - void add(UInt64 value) override - { - count ++; - sum += value; - reservoir_sampler.update(value); - } + Strings values() const override; - static double getValue(const std::vector& numbers, double quantile); + void add(RK::UInt64 value) override { sum += value; } + UInt64 getSum() const { return sum.load(); } - Strings values() const override; + void reset() override { sum.store(0); } private: String name; - ReservoirSampler reservoir_sampler; - std::atomic count{0}; std::atomic sum{0}; }; - class BasicSummary : public Summary { public: - BasicSummary(const String & name_): name(name_) + explicit BasicSummary(const String & name_): name(name_) { } @@ -148,6 +140,38 @@ class BasicSummary : public Summary std::atomic max{0}; }; +class AdvanceSummary : public Summary +{ +public: + explicit AdvanceSummary(const String & name_): name(name_) + { + } + + void reset() override + { + count.store(0); + sum.store(0); + reservoir_sampler.reset(); + } + + void add(UInt64 value) override + { + count ++; + sum += value; + reservoir_sampler.update(value); + } + + static double getValue(const std::vector& numbers, double quantile); + + Strings values() const override; + +private: + String name; + ReservoirSampler reservoir_sampler; + std::atomic count{0}; + std::atomic sum{0}; +}; + /** Implements Summary Metrics for RK. * There is possible race-condition, but we don't need the stats to be extremely accurate. */ @@ -156,7 +180,7 @@ class Metrics public: using SummaryPtr = std::shared_ptr; - static Metrics& getMetrics() + static Metrics & getMetrics() { static Metrics metrics; return metrics; @@ -176,10 +200,13 @@ class Metrics SummaryPtr apply_read_request_time_ms; SummaryPtr read_latency; SummaryPtr update_latency; + SummaryPtr snap_time_ms; + SummaryPtr snap_blocking_time_ms; + SummaryPtr snap_count; private: Metrics(); - SummaryPtr getSummary(const String & name, SummaryLevel detailLevel); + SummaryPtr getSummary(const String & name, SummaryLevel level); std::map summaries; }; diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index d15e3313a9..a53502eb27 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -36,6 +37,7 @@ struct SnapTask Stopwatch watch; buckets_nodes = store.dumpDataTree(); LOG_INFO(log, "Dump dataTree costs {}ms", watch.elapsedMilliseconds()); + Metrics::getMetrics().snap_blocking_time_ms->add(watch.elapsedMilliseconds()); for (auto && bucket : *buckets_nodes) { diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index efd5f4986d..f297fa2091 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -149,7 +149,6 @@ void NuRaftStateMachine::snapThread() { if (snap_task_ready) { - Stopwatch stopwatch; auto current_task = std::move(snap_task); snap_task_ready = false; @@ -165,15 +164,13 @@ void NuRaftStateMachine::snapThread() current_task->when_done(ret, except); - stopwatch.stop(); - - snap_count.fetch_add(1); - snap_time_ms.fetch_add(stopwatch.elapsedMilliseconds()); + Metrics::getMetrics().snap_count->add(1); + Metrics::getMetrics().snap_time_ms->add(Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); last_snapshot_time = Poco::Timestamp().epochMicroseconds(); in_snapshot = false; - LOG_INFO(log, "Create snapshot time cost {} ms", stopwatch.elapsedMilliseconds()); + LOG_INFO(log, "Create snapshot time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } @@ -432,8 +429,8 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result::handl } } - Stopwatch stopwatch; in_snapshot = true; + snap_start_time = Poco::Timestamp().epochMicroseconds() / 1000; LOG_INFO(log, "Creating snapshot last_log_term {}, last_log_idx {}", s.get_last_log_term(), s.get_last_log_idx()); @@ -444,15 +441,13 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result::handl bool ret = true; when_done(ret, except); - stopwatch.stop(); - - snap_count.fetch_add(1); - snap_time_ms.fetch_add(stopwatch.elapsedMilliseconds()); + Metrics::getMetrics().snap_count->add(1); + Metrics::getMetrics().snap_time_ms->add(Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); last_snapshot_time = Poco::Timestamp().epochMicroseconds(); in_snapshot = false; - LOG_INFO(log, "Created snapshot, time cost {} us", stopwatch.elapsedMicroseconds()); + LOG_INFO(log, "Created snapshot, time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); } else { @@ -462,7 +457,7 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result::handl snap_task = std::make_shared(snap_copy, store, when_done); snap_task_ready = true; - LOG_INFO(log, "Scheduling asynchronous creating snapshot task, time cost {} us", stopwatch.elapsedMicroseconds()); + LOG_INFO(log, "Scheduling asynchronous creating snapshot task, time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); } } diff --git a/src/Service/NuRaftStateMachine.h b/src/Service/NuRaftStateMachine.h index 6b7912634d..0c37b58d7c 100644 --- a/src/Service/NuRaftStateMachine.h +++ b/src/Service/NuRaftStateMachine.h @@ -271,18 +271,6 @@ class NuRaftStateMachine : public nuraft::state_machine /// and follower only contains local session. bool containsSession(int64_t session_id) const; - /// how many snapshot created in process life time. - uint64_t getSnapshotCount() const - { - return snap_count; - } - - /// how many times cost byt snapshot creating in process life time. - uint64_t getSnapshotTimeMs() const - { - return snap_time_ms; - } - /// whether a snapshot creating is in progress. bool getSnapshoting() const { @@ -344,15 +332,10 @@ class NuRaftStateMachine : public nuraft::state_machine /// When get a not exist node, return blank. KeeperNode default_node; - /// how many snapshot created in process life time. - std::atomic_int64_t snap_count{0}; - - /// how many times cost byt snapshot creating in process life time. - std::atomic_int64_t snap_time_ms{0}; - /// whether a snapshot creating is in progress. std::atomic_bool in_snapshot = false; std::atomic_bool snap_task_ready{false}; + std::atomic_uint64_t snap_start_time; ThreadFromGlobalPool snap_thread; diff --git a/tests/integration/test_four_word_command/test.py b/tests/integration/test_four_word_command/test.py index a98f4172d9..54c56a5011 100644 --- a/tests/integration/test_four_word_command/test.py +++ b/tests/integration/test_four_word_command/test.py @@ -17,6 +17,7 @@ node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/logs_conf.xml'], stay_alive=True) +simple_metrics = ["snap_time_ms", "snap_blocking_time_ms", "snap_count"] basic_metrics = ["log_replication_batch_size"] advance_metrics = ["apply_read_request_time_ms", "apply_write_request_time_ms", "push_request_queue_time_ms", "readlatency", "updatelatency", ] @@ -194,6 +195,9 @@ def test_cmd_mntr(started_cluster): assert int(result["zk_packets_sent"]) >= 31 assert int(result["zk_packets_received"]) >= 31 + for metric in simple_metrics: + assert (f'zk_{metric}' in result) + for metric in basic_metrics: assert (f'zk_avg_{metric}' in result) assert (f'zk_min_{metric}' in result)