diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 0fa2b2a4e3..e1e3ae3c47 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include "Monitor.h" + +#include "MetricRecord.h" #if defined(__linux__) #include #include @@ -177,7 +179,9 @@ void LogtailMonitor::Monitor() { lastCheckHardLimitTime = monitorTime; GetMemStat(); + LoongCollectorMonitor::GetInstance()->SetAgentMemory(mMemStat.mRss); CalCpuStat(curCpuStat, mCpuStat); + LoongCollectorMonitor::GetInstance()->SetAgentCpu(mCpuStat.mCpuUsage); if (CheckHardMemLimit()) { LOG_ERROR(sLogger, ("Resource used by program exceeds hard limit", @@ -247,10 +251,6 @@ bool LogtailMonitor::SendStatusProfile(bool suicide) { sleep(10); _exit(1); } - // CPU usage of Logtail process. - LoongCollectorMonitor::GetInstance()->SetAgentCpu(mCpuStat.mCpuUsage); - // Memory usage of Logtail process. - LoongCollectorMonitor::GetInstance()->SetAgentMemory(mMemStat.mRss); return mIsThreadRunning; } @@ -635,4 +635,35 @@ void LoongCollectorMonitor::Stop() { LOG_INFO(sLogger, ("LoongCollector monitor", "stopped successfully")); } +bool LoongCollectorMonitor::GetAgentMetric(SelfMonitorMetricEvent& event) { + lock_guard lock(mGlobalMetricsMux); + event = mGlobalMetrics.mAgentMetric; + return true; +} + +void LoongCollectorMonitor::SetAgentMetric(const SelfMonitorMetricEvent& event) { + lock_guard lock(mGlobalMetricsMux); + mGlobalMetrics.mAgentMetric = event; +} + +bool LoongCollectorMonitor::GetRunnerMetric(const std::string& runnerName, SelfMonitorMetricEvent& event) { + if (runnerName.empty()) { + return false; + } + lock_guard lock(mGlobalMetricsMux); + if (mGlobalMetrics.mRunnerMetrics.find(runnerName) != mGlobalMetrics.mRunnerMetrics.end()) { + event = mGlobalMetrics.mRunnerMetrics[runnerName]; + return true; + } + return false; +} + +void LoongCollectorMonitor::SetRunnerMetric(const std::string& runnerName, const SelfMonitorMetricEvent& event) { + if (runnerName.empty()) { + return; + } + lock_guard lock(mGlobalMetricsMux); + mGlobalMetrics.mRunnerMetrics[runnerName] = event; +} + } // namespace logtail diff --git a/core/monitor/Monitor.h b/core/monitor/Monitor.h index a76acd6008..6fff5c6508 100644 --- a/core/monitor/Monitor.h +++ b/core/monitor/Monitor.h @@ -20,8 +20,10 @@ #include #include #include +#include #include "MetricManager.h" +#include "MetricTypes.h" #if defined(_MSC_VER) #include @@ -188,6 +190,11 @@ class LoongCollectorMonitor { void Init(); void Stop(); + bool GetAgentMetric(SelfMonitorMetricEvent& event); + void SetAgentMetric(const SelfMonitorMetricEvent& event); + bool GetRunnerMetric(const std::string& runnerName, SelfMonitorMetricEvent& event); + void SetRunnerMetric(const std::string& runnerName, const SelfMonitorMetricEvent& event); + void SetAgentCpu(double cpu) { SET_GAUGE(mAgentCpu, cpu); } void SetAgentMemory(uint64_t mem) { SET_GAUGE(mAgentMemory, mem); } void SetAgentGoMemory(uint64_t mem) { SET_GAUGE(mAgentGoMemory, mem); } @@ -214,6 +221,16 @@ class LoongCollectorMonitor { LoongCollectorMonitor(); ~LoongCollectorMonitor(); + // 一个全局级别指标的副本,由 SelfMonitorServer::PushSelfMonitorMetricEvents 更新,格式为: + // {MetricCategory: {key:MetricValue}} + // 现支持 Agent 和 Runner 指标的保存、获取 + struct GlobalMetrics { + SelfMonitorMetricEvent mAgentMetric; + std::unordered_map mRunnerMetrics; + }; + std::mutex mGlobalMetricsMux; + GlobalMetrics mGlobalMetrics; + // MetricRecord MetricsRecordRef mMetricsRecordRef; diff --git a/core/monitor/SelfMonitorServer.cpp b/core/monitor/SelfMonitorServer.cpp index c7fd5692ae..00b55a9411 100644 --- a/core/monitor/SelfMonitorServer.cpp +++ b/core/monitor/SelfMonitorServer.cpp @@ -16,7 +16,8 @@ #include "monitor/SelfMonitorServer.h" -#include "common/LogtailCommonFlags.h" +#include "MetricConstants.h" +#include "Monitor.h" #include "runner/ProcessorRunner.h" using namespace std; @@ -139,8 +140,10 @@ void SelfMonitorServer::PushSelfMonitorMetricEvents(std::vectorSetAgentMetric(event); shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mAgentMetricsRule); } else if (event.mCategory == MetricCategory::METRIC_CATEGORY_RUNNER) { + LoongCollectorMonitor::GetInstance()->SetRunnerMetric(event.GetLabel(METRIC_LABEL_KEY_RUNNER_NAME), event); shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mRunnerMetricsRule); } else if (event.mCategory == MetricCategory::METRIC_CATEGORY_COMPONENT) { shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mComponentMetricsRule); @@ -157,7 +160,7 @@ void SelfMonitorServer::PushSelfMonitorMetricEvents(std::vectorGetCategory(); @@ -131,14 +128,14 @@ void SelfMonitorMetricEvent::CreateKey() { } void SelfMonitorMetricEvent::SetInterval(size_t interval) { - mLastSendInterval = 0; + mIntervalsSinceLastSend = 0; mSendInterval = interval; } -void SelfMonitorMetricEvent::Merge(SelfMonitorMetricEvent& event) { +void SelfMonitorMetricEvent::Merge(const SelfMonitorMetricEvent& event) { if (mSendInterval != event.mSendInterval) { mSendInterval = event.mSendInterval; - mLastSendInterval = 0; + mIntervalsSinceLastSend = 0; } for (auto counter = event.mCounters.begin(); counter != event.mCounters.end(); counter++) { if (mCounters.find(counter->first) != mCounters.end()) @@ -153,12 +150,12 @@ void SelfMonitorMetricEvent::Merge(SelfMonitorMetricEvent& event) { } bool SelfMonitorMetricEvent::ShouldSend() { - mLastSendInterval++; - return (mLastSendInterval >= mSendInterval) && mUpdatedFlag; + mIntervalsSinceLastSend++; + return (mIntervalsSinceLastSend >= mSendInterval) && mUpdatedFlag; } bool SelfMonitorMetricEvent::ShouldDelete() { - return (mLastSendInterval >= mSendInterval) && !mUpdatedFlag; + return (mIntervalsSinceLastSend >= mSendInterval) && !mUpdatedFlag; } void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) { @@ -182,8 +179,29 @@ void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) { gauge->first, {UntypedValueMetricType::MetricTypeGauge, gauge->second}); } // set flags - mLastSendInterval = 0; + mIntervalsSinceLastSend = 0; mUpdatedFlag = false; } +std::string SelfMonitorMetricEvent::GetLabel(const std::string& labelKey) { + if (mLabels.find(labelKey) != mLabels.end()) { + return mLabels.at(labelKey); + } + return ""; +} + +uint64_t SelfMonitorMetricEvent::GetCounter(const std::string& counterName) { + if (mCounters.find(counterName) != mCounters.end()) { + return mCounters.at(counterName); + } + return 0; +} + +double SelfMonitorMetricEvent::GetGauge(const std::string& gaugeName) { + if (mGauges.find(gaugeName) != mGauges.end()) { + return mGauges.at(gaugeName); + } + return 0; +} + } // namespace logtail diff --git a/core/monitor/metric_models/SelfMonitorMetricEvent.h b/core/monitor/metric_models/SelfMonitorMetricEvent.h index 98b185aea9..902857975e 100644 --- a/core/monitor/metric_models/SelfMonitorMetricEvent.h +++ b/core/monitor/metric_models/SelfMonitorMetricEvent.h @@ -15,8 +15,10 @@ */ #pragma once -#include "MetricRecord.h" -#include "models/PipelineEventGroup.h" +#include + +#include "models/MetricEvent.h" +#include "monitor/metric_models/MetricRecord.h" namespace logtail { @@ -37,17 +39,24 @@ struct SelfMonitorMetricRules { using SelfMonitorMetricEventKey = int64_t; class SelfMonitorMetricEvent { public: - SelfMonitorMetricEvent(); + SelfMonitorMetricEvent() = default; + SelfMonitorMetricEvent(const SelfMonitorMetricEvent& event) = default; + SelfMonitorMetricEvent(MetricsRecord* metricRecord); SelfMonitorMetricEvent(const std::map& metricRecord); void SetInterval(size_t interval); - void Merge(SelfMonitorMetricEvent& event); + void Merge(const SelfMonitorMetricEvent& event); bool ShouldSend(); bool ShouldDelete(); void ReadAsMetricEvent(MetricEvent* metricEventPtr); + // 调用的对象应是不再修改的只读对象,不用加锁 + std::string GetLabel(const std::string& labelKey); + uint64_t GetCounter(const std::string& counterName); + double GetGauge(const std::string& gaugeName); + SelfMonitorMetricEventKey mKey; // labels + category std::string mCategory; // category private: @@ -57,7 +66,7 @@ class SelfMonitorMetricEvent { std::unordered_map mCounters; std::unordered_map mGauges; int32_t mSendInterval; - int32_t mLastSendInterval; + int32_t mIntervalsSinceLastSend; bool mUpdatedFlag; #ifdef APSARA_UNIT_TEST_MAIN diff --git a/core/unittest/monitor/SelfMonitorMetricEventUnittest.cpp b/core/unittest/monitor/SelfMonitorMetricEventUnittest.cpp index 91f1712bcd..669c103fbf 100644 --- a/core/unittest/monitor/SelfMonitorMetricEventUnittest.cpp +++ b/core/unittest/monitor/SelfMonitorMetricEventUnittest.cpp @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "MetricConstants.h" +#include "MetricRecord.h" +#include "Monitor.h" #include "monitor/MetricManager.h" #include "monitor/metric_models/SelfMonitorMetricEvent.h" #include "unittest/Unittest.h" @@ -27,6 +30,7 @@ class SelfMonitorMetricEventUnittest : public ::testing::Test { void TestCreateFromGoMetricMap(); void TestMerge(); void TestSendInterval(); + void TestGlobalMetrics(); private: std::shared_ptr mSourceBuffer; @@ -38,6 +42,7 @@ APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestCreateFromMetricEvent, APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestCreateFromGoMetricMap, 1); APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestMerge, 2); APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestSendInterval, 3); +APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestGlobalMetrics, 4); void SelfMonitorMetricEventUnittest::TestCreateFromMetricEvent() { std::vector> labels; @@ -165,7 +170,7 @@ void SelfMonitorMetricEventUnittest::TestMerge() { event1.Merge(event2); // 检验间隔是否被设置为 event2 的间隔 - APSARA_TEST_EQUAL(0, event1.mLastSendInterval); + APSARA_TEST_EQUAL(0, event1.mIntervalsSinceLastSend); APSARA_TEST_EQUAL(10, event1.mSendInterval); // 检验计数器是否正确合并 APSARA_TEST_EQUAL(300, event1.mCounters["counter1"]); @@ -198,6 +203,59 @@ void SelfMonitorMetricEventUnittest::TestSendInterval() { APSARA_TEST_TRUE(event.ShouldDelete()); // 第三次调用,间隔计数达到3,应返回true } +void SelfMonitorMetricEventUnittest::TestGlobalMetrics() { + { // test set/get agent metric + SelfMonitorMetricEvent originAgentEvent; + SelfMonitorMetricEvent wantAgentEvent; + + // set + originAgentEvent.mCategory = MetricCategory::METRIC_CATEGORY_AGENT; + originAgentEvent.mLabels = {{METRIC_LABEL_KEY_PROJECT, "test_project"}, {METRIC_LABEL_KEY_OS, "Linux"}}; + originAgentEvent.mCounters = {{"test_counter", 1}}; + originAgentEvent.mGauges = {{METRIC_AGENT_CPU, 0.3}, {METRIC_AGENT_MEMORY, 99}}; + LoongCollectorMonitor::GetInstance()->SetAgentMetric(originAgentEvent); + + // get + APSARA_TEST_TRUE(LoongCollectorMonitor::GetInstance()->GetAgentMetric(wantAgentEvent)); + APSARA_TEST_EQUAL(MetricCategory::METRIC_CATEGORY_AGENT, wantAgentEvent.mCategory); + APSARA_TEST_EQUAL("test_project", wantAgentEvent.GetLabel(METRIC_LABEL_KEY_PROJECT)); + APSARA_TEST_EQUAL("Linux", wantAgentEvent.GetLabel(METRIC_LABEL_KEY_OS)); + APSARA_TEST_EQUAL("", wantAgentEvent.GetLabel("")); + APSARA_TEST_EQUAL(1, wantAgentEvent.GetCounter("test_counter")); + APSARA_TEST_EQUAL(0, wantAgentEvent.GetCounter("")); + APSARA_TEST_EQUAL(0.3, wantAgentEvent.GetGauge(METRIC_AGENT_CPU)); + APSARA_TEST_EQUAL(99, wantAgentEvent.GetGauge(METRIC_AGENT_MEMORY)); + APSARA_TEST_EQUAL(0, wantAgentEvent.GetGauge("")); + } + { // test set/get runner metric + SelfMonitorMetricEvent originRunnerEvent; + SelfMonitorMetricEvent wantRunnerEvent; + APSARA_TEST_FALSE(LoongCollectorMonitor::GetInstance()->GetRunnerMetric("", wantRunnerEvent)); + + // set + std::string runnerName = METRIC_LABEL_VALUE_RUNNER_NAME_HTTP_SINK; + originRunnerEvent.mCategory = MetricCategory::METRIC_CATEGORY_RUNNER; + originRunnerEvent.mLabels + = {{METRIC_LABEL_KEY_RUNNER_NAME, runnerName}, {METRIC_LABEL_KEY_PROJECT, "test_project"}}; + originRunnerEvent.mCounters = {{METRIC_RUNNER_IN_EVENTS_TOTAL, 1}, {METRIC_RUNNER_TOTAL_DELAY_MS, 99}}; + originRunnerEvent.mGauges = {{METRIC_RUNNER_LAST_RUN_TIME, 1111111}}; + LoongCollectorMonitor::GetInstance()->SetRunnerMetric(runnerName, originRunnerEvent); + + // get + APSARA_TEST_FALSE(LoongCollectorMonitor::GetInstance()->GetRunnerMetric("", wantRunnerEvent)); + APSARA_TEST_TRUE(LoongCollectorMonitor::GetInstance()->GetRunnerMetric(runnerName, wantRunnerEvent)); + APSARA_TEST_EQUAL(MetricCategory::METRIC_CATEGORY_RUNNER, wantRunnerEvent.mCategory); + APSARA_TEST_EQUAL("test_project", wantRunnerEvent.GetLabel(METRIC_LABEL_KEY_PROJECT)); + APSARA_TEST_EQUAL(runnerName, wantRunnerEvent.GetLabel(METRIC_LABEL_KEY_RUNNER_NAME)); + APSARA_TEST_EQUAL("", wantRunnerEvent.GetLabel("")); + APSARA_TEST_EQUAL(1, wantRunnerEvent.GetCounter(METRIC_RUNNER_IN_EVENTS_TOTAL)); + APSARA_TEST_EQUAL(99, wantRunnerEvent.GetCounter(METRIC_RUNNER_TOTAL_DELAY_MS)); + APSARA_TEST_EQUAL(0, wantRunnerEvent.GetCounter("")); + APSARA_TEST_EQUAL(1111111, wantRunnerEvent.GetGauge(METRIC_RUNNER_LAST_RUN_TIME)); + APSARA_TEST_EQUAL(0, wantRunnerEvent.GetGauge("")); + } +} + } // namespace logtail int main(int argc, char** argv) {