Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global Metric Registration for Other Module Access #2108

Merged
merged 3 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include "Monitor.h"

#include "MetricRecord.h"
#if defined(__linux__)
#include <asm/param.h>
#include <unistd.h>
Expand Down Expand Up @@ -177,7 +179,9 @@ void LogtailMonitor::Monitor() {
lastCheckHardLimitTime = monitorTime;

GetMemStat();
LoongCollectorMonitor::GetInstance()->SetAgentMemory(mMemStat.mRss);
CalCpuStat(curCpuStat, mCpuStat);
LoongCollectorMonitor::GetInstance()->SetAgentCpu(mCpuStat.mCpuUsage);
if (CheckHardMemLimit()) {
LOG_ERROR(sLogger,
("Resource used by program exceeds hard limit",
Expand Down Expand Up @@ -247,10 +251,6 @@ bool LogtailMonitor::SendStatusProfile(bool suicide) {
sleep(10);
_exit(1);
}
// CPU usage of Logtail process.
LoongCollectorMonitor::GetInstance()->SetAgentCpu(mCpuStat.mCpuUsage);
// Memory usage of Logtail process.
LoongCollectorMonitor::GetInstance()->SetAgentMemory(mMemStat.mRss);

return mIsThreadRunning;
}
Expand Down Expand Up @@ -635,4 +635,35 @@ void LoongCollectorMonitor::Stop() {
LOG_INFO(sLogger, ("LoongCollector monitor", "stopped successfully"));
}

bool LoongCollectorMonitor::GetAgentMetric(SelfMonitorMetricEvent& event) {
lock_guard<mutex> lock(mGlobalMetricsMux);
event = mGlobalMetrics.mAgentMetric;
return true;
}

void LoongCollectorMonitor::SetAgentMetric(const SelfMonitorMetricEvent& event) {
lock_guard<mutex> lock(mGlobalMetricsMux);
mGlobalMetrics.mAgentMetric = event;
}

bool LoongCollectorMonitor::GetRunnerMetric(const std::string& runnerName, SelfMonitorMetricEvent& event) {
if (runnerName.empty()) {
return false;
}
lock_guard<mutex> lock(mGlobalMetricsMux);
if (mGlobalMetrics.mRunnerMetrics.find(runnerName) != mGlobalMetrics.mRunnerMetrics.end()) {
event = mGlobalMetrics.mRunnerMetrics[runnerName];
return true;
}
return false;
}

void LoongCollectorMonitor::SetRunnerMetric(const std::string& runnerName, const SelfMonitorMetricEvent& event) {
if (runnerName.empty()) {
return;
}
lock_guard<mutex> lock(mGlobalMetricsMux);
mGlobalMetrics.mRunnerMetrics[runnerName] = event;
}

} // namespace logtail
17 changes: 17 additions & 0 deletions core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <future>
#include <mutex>
#include <string>
#include <unordered_map>

#include "MetricManager.h"
#include "MetricTypes.h"

#if defined(_MSC_VER)
#include <Windows.h>
Expand Down Expand Up @@ -188,6 +190,11 @@ class LoongCollectorMonitor {
void Init();
void Stop();

bool GetAgentMetric(SelfMonitorMetricEvent& event);
void SetAgentMetric(const SelfMonitorMetricEvent& event);
bool GetRunnerMetric(const std::string& runnerName, SelfMonitorMetricEvent& event);
void SetRunnerMetric(const std::string& runnerName, const SelfMonitorMetricEvent& event);

void SetAgentCpu(double cpu) { SET_GAUGE(mAgentCpu, cpu); }
void SetAgentMemory(uint64_t mem) { SET_GAUGE(mAgentMemory, mem); }
void SetAgentGoMemory(uint64_t mem) { SET_GAUGE(mAgentGoMemory, mem); }
Expand All @@ -214,6 +221,16 @@ class LoongCollectorMonitor {
LoongCollectorMonitor();
~LoongCollectorMonitor();

// 一个全局级别指标的副本,由 SelfMonitorServer::PushSelfMonitorMetricEvents 更新,格式为:
// {MetricCategory: {key:MetricValue}}
// 现支持 Agent 和 Runner 指标的保存、获取
struct GlobalMetrics {
SelfMonitorMetricEvent mAgentMetric;
std::unordered_map<std::string, SelfMonitorMetricEvent> mRunnerMetrics;
};
std::mutex mGlobalMetricsMux;
GlobalMetrics mGlobalMetrics;

// MetricRecord
MetricsRecordRef mMetricsRecordRef;

Expand Down
7 changes: 5 additions & 2 deletions core/monitor/SelfMonitorServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

#include "monitor/SelfMonitorServer.h"

#include "common/LogtailCommonFlags.h"
#include "MetricConstants.h"
#include "Monitor.h"
#include "runner/ProcessorRunner.h"

using namespace std;
Expand Down Expand Up @@ -139,8 +140,10 @@ void SelfMonitorServer::PushSelfMonitorMetricEvents(std::vector<SelfMonitorMetri
for (auto event : events) {
bool shouldSkip = false;
if (event.mCategory == MetricCategory::METRIC_CATEGORY_AGENT) {
LoongCollectorMonitor::GetInstance()->SetAgentMetric(event);
shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mAgentMetricsRule);
} else if (event.mCategory == MetricCategory::METRIC_CATEGORY_RUNNER) {
LoongCollectorMonitor::GetInstance()->SetRunnerMetric(event.GetLabel(METRIC_LABEL_KEY_RUNNER_NAME), event);
shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mRunnerMetricsRule);
} else if (event.mCategory == MetricCategory::METRIC_CATEGORY_COMPONENT) {
shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mComponentMetricsRule);
Expand All @@ -157,7 +160,7 @@ void SelfMonitorServer::PushSelfMonitorMetricEvents(std::vector<SelfMonitorMetri
if (mSelfMonitorMetricEventMap.find(event.mKey) != mSelfMonitorMetricEventMap.end()) {
mSelfMonitorMetricEventMap[event.mKey].Merge(event);
} else {
mSelfMonitorMetricEventMap[event.mKey] = std::move(event);
mSelfMonitorMetricEventMap[event.mKey] = event;
}
}
}
Expand Down
38 changes: 28 additions & 10 deletions core/monitor/metric_models/SelfMonitorMetricEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ const string METRIC_GO_KEY_LABELS = "labels";
const string METRIC_GO_KEY_COUNTERS = "counters";
const string METRIC_GO_KEY_GAUGES = "gauges";

SelfMonitorMetricEvent::SelfMonitorMetricEvent() {
}

SelfMonitorMetricEvent::SelfMonitorMetricEvent(MetricsRecord* metricRecord) {
// category
mCategory = metricRecord->GetCategory();
Expand Down Expand Up @@ -131,14 +128,14 @@ void SelfMonitorMetricEvent::CreateKey() {
}

void SelfMonitorMetricEvent::SetInterval(size_t interval) {
mLastSendInterval = 0;
mIntervalsSinceLastSend = 0;
mSendInterval = interval;
}

void SelfMonitorMetricEvent::Merge(SelfMonitorMetricEvent& event) {
void SelfMonitorMetricEvent::Merge(const SelfMonitorMetricEvent& event) {
if (mSendInterval != event.mSendInterval) {
mSendInterval = event.mSendInterval;
mLastSendInterval = 0;
mIntervalsSinceLastSend = 0;
}
for (auto counter = event.mCounters.begin(); counter != event.mCounters.end(); counter++) {
if (mCounters.find(counter->first) != mCounters.end())
Expand All @@ -153,12 +150,12 @@ void SelfMonitorMetricEvent::Merge(SelfMonitorMetricEvent& event) {
}

bool SelfMonitorMetricEvent::ShouldSend() {
mLastSendInterval++;
return (mLastSendInterval >= mSendInterval) && mUpdatedFlag;
mIntervalsSinceLastSend++;
return (mIntervalsSinceLastSend >= mSendInterval) && mUpdatedFlag;
}

bool SelfMonitorMetricEvent::ShouldDelete() {
return (mLastSendInterval >= mSendInterval) && !mUpdatedFlag;
return (mIntervalsSinceLastSend >= mSendInterval) && !mUpdatedFlag;
}

void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) {
Expand All @@ -182,8 +179,29 @@ void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) {
gauge->first, {UntypedValueMetricType::MetricTypeGauge, gauge->second});
}
// set flags
mLastSendInterval = 0;
mIntervalsSinceLastSend = 0;
mUpdatedFlag = false;
}

std::string SelfMonitorMetricEvent::GetLabel(const std::string& labelKey) {
if (mLabels.find(labelKey) != mLabels.end()) {
return mLabels.at(labelKey);
}
return "";
}

uint64_t SelfMonitorMetricEvent::GetCounter(const std::string& counterName) {
if (mCounters.find(counterName) != mCounters.end()) {
return mCounters.at(counterName);
}
return 0;
}

double SelfMonitorMetricEvent::GetGauge(const std::string& gaugeName) {
if (mGauges.find(gaugeName) != mGauges.end()) {
return mGauges.at(gaugeName);
}
return 0;
}

} // namespace logtail
19 changes: 14 additions & 5 deletions core/monitor/metric_models/SelfMonitorMetricEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/

#pragma once
#include "MetricRecord.h"
#include "models/PipelineEventGroup.h"
#include <map>

#include "models/MetricEvent.h"
#include "monitor/metric_models/MetricRecord.h"

namespace logtail {

Expand All @@ -37,17 +39,24 @@ struct SelfMonitorMetricRules {
using SelfMonitorMetricEventKey = int64_t;
class SelfMonitorMetricEvent {
public:
SelfMonitorMetricEvent();
SelfMonitorMetricEvent() = default;
SelfMonitorMetricEvent(const SelfMonitorMetricEvent& event) = default;

SelfMonitorMetricEvent(MetricsRecord* metricRecord);
SelfMonitorMetricEvent(const std::map<std::string, std::string>& metricRecord);

void SetInterval(size_t interval);
void Merge(SelfMonitorMetricEvent& event);
void Merge(const SelfMonitorMetricEvent& event);

bool ShouldSend();
bool ShouldDelete();
void ReadAsMetricEvent(MetricEvent* metricEventPtr);

// 调用的对象应是不再修改的只读对象,不用加锁
std::string GetLabel(const std::string& labelKey);
uint64_t GetCounter(const std::string& counterName);
double GetGauge(const std::string& gaugeName);

SelfMonitorMetricEventKey mKey; // labels + category
std::string mCategory; // category
private:
Expand All @@ -57,7 +66,7 @@ class SelfMonitorMetricEvent {
std::unordered_map<std::string, uint64_t> mCounters;
std::unordered_map<std::string, double> mGauges;
int32_t mSendInterval;
int32_t mLastSendInterval;
int32_t mIntervalsSinceLastSend;
bool mUpdatedFlag;

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down
60 changes: 59 additions & 1 deletion core/unittest/monitor/SelfMonitorMetricEventUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "MetricConstants.h"
#include "MetricRecord.h"
#include "Monitor.h"
#include "monitor/MetricManager.h"
#include "monitor/metric_models/SelfMonitorMetricEvent.h"
#include "unittest/Unittest.h"
Expand All @@ -27,6 +30,7 @@ class SelfMonitorMetricEventUnittest : public ::testing::Test {
void TestCreateFromGoMetricMap();
void TestMerge();
void TestSendInterval();
void TestGlobalMetrics();

private:
std::shared_ptr<SourceBuffer> mSourceBuffer;
Expand All @@ -38,6 +42,7 @@ APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestCreateFromMetricEvent,
APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestCreateFromGoMetricMap, 1);
APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestMerge, 2);
APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestSendInterval, 3);
APSARA_UNIT_TEST_CASE(SelfMonitorMetricEventUnittest, TestGlobalMetrics, 4);

void SelfMonitorMetricEventUnittest::TestCreateFromMetricEvent() {
std::vector<std::pair<std::string, std::string>> labels;
Expand Down Expand Up @@ -165,7 +170,7 @@ void SelfMonitorMetricEventUnittest::TestMerge() {
event1.Merge(event2);

// 检验间隔是否被设置为 event2 的间隔
APSARA_TEST_EQUAL(0, event1.mLastSendInterval);
APSARA_TEST_EQUAL(0, event1.mIntervalsSinceLastSend);
APSARA_TEST_EQUAL(10, event1.mSendInterval);
// 检验计数器是否正确合并
APSARA_TEST_EQUAL(300, event1.mCounters["counter1"]);
Expand Down Expand Up @@ -198,6 +203,59 @@ void SelfMonitorMetricEventUnittest::TestSendInterval() {
APSARA_TEST_TRUE(event.ShouldDelete()); // 第三次调用,间隔计数达到3,应返回true
}

void SelfMonitorMetricEventUnittest::TestGlobalMetrics() {
{ // test set/get agent metric
SelfMonitorMetricEvent originAgentEvent;
SelfMonitorMetricEvent wantAgentEvent;

// set
originAgentEvent.mCategory = MetricCategory::METRIC_CATEGORY_AGENT;
originAgentEvent.mLabels = {{METRIC_LABEL_KEY_PROJECT, "test_project"}, {METRIC_LABEL_KEY_OS, "Linux"}};
originAgentEvent.mCounters = {{"test_counter", 1}};
originAgentEvent.mGauges = {{METRIC_AGENT_CPU, 0.3}, {METRIC_AGENT_MEMORY, 99}};
LoongCollectorMonitor::GetInstance()->SetAgentMetric(originAgentEvent);

// get
APSARA_TEST_TRUE(LoongCollectorMonitor::GetInstance()->GetAgentMetric(wantAgentEvent));
APSARA_TEST_EQUAL(MetricCategory::METRIC_CATEGORY_AGENT, wantAgentEvent.mCategory);
APSARA_TEST_EQUAL("test_project", wantAgentEvent.GetLabel(METRIC_LABEL_KEY_PROJECT));
APSARA_TEST_EQUAL("Linux", wantAgentEvent.GetLabel(METRIC_LABEL_KEY_OS));
APSARA_TEST_EQUAL("", wantAgentEvent.GetLabel(""));
APSARA_TEST_EQUAL(1, wantAgentEvent.GetCounter("test_counter"));
APSARA_TEST_EQUAL(0, wantAgentEvent.GetCounter(""));
APSARA_TEST_EQUAL(0.3, wantAgentEvent.GetGauge(METRIC_AGENT_CPU));
APSARA_TEST_EQUAL(99, wantAgentEvent.GetGauge(METRIC_AGENT_MEMORY));
APSARA_TEST_EQUAL(0, wantAgentEvent.GetGauge(""));
}
{ // test set/get runner metric
SelfMonitorMetricEvent originRunnerEvent;
SelfMonitorMetricEvent wantRunnerEvent;
APSARA_TEST_FALSE(LoongCollectorMonitor::GetInstance()->GetRunnerMetric("", wantRunnerEvent));

// set
std::string runnerName = METRIC_LABEL_VALUE_RUNNER_NAME_HTTP_SINK;
originRunnerEvent.mCategory = MetricCategory::METRIC_CATEGORY_RUNNER;
originRunnerEvent.mLabels
= {{METRIC_LABEL_KEY_RUNNER_NAME, runnerName}, {METRIC_LABEL_KEY_PROJECT, "test_project"}};
originRunnerEvent.mCounters = {{METRIC_RUNNER_IN_EVENTS_TOTAL, 1}, {METRIC_RUNNER_TOTAL_DELAY_MS, 99}};
originRunnerEvent.mGauges = {{METRIC_RUNNER_LAST_RUN_TIME, 1111111}};
LoongCollectorMonitor::GetInstance()->SetRunnerMetric(runnerName, originRunnerEvent);

// get
APSARA_TEST_FALSE(LoongCollectorMonitor::GetInstance()->GetRunnerMetric("", wantRunnerEvent));
APSARA_TEST_TRUE(LoongCollectorMonitor::GetInstance()->GetRunnerMetric(runnerName, wantRunnerEvent));
APSARA_TEST_EQUAL(MetricCategory::METRIC_CATEGORY_RUNNER, wantRunnerEvent.mCategory);
APSARA_TEST_EQUAL("test_project", wantRunnerEvent.GetLabel(METRIC_LABEL_KEY_PROJECT));
APSARA_TEST_EQUAL(runnerName, wantRunnerEvent.GetLabel(METRIC_LABEL_KEY_RUNNER_NAME));
APSARA_TEST_EQUAL("", wantRunnerEvent.GetLabel(""));
APSARA_TEST_EQUAL(1, wantRunnerEvent.GetCounter(METRIC_RUNNER_IN_EVENTS_TOTAL));
APSARA_TEST_EQUAL(99, wantRunnerEvent.GetCounter(METRIC_RUNNER_TOTAL_DELAY_MS));
APSARA_TEST_EQUAL(0, wantRunnerEvent.GetCounter(""));
APSARA_TEST_EQUAL(1111111, wantRunnerEvent.GetGauge(METRIC_RUNNER_LAST_RUN_TIME));
APSARA_TEST_EQUAL(0, wantRunnerEvent.GetGauge(""));
}
}

} // namespace logtail

int main(int argc, char** argv) {
Expand Down
Loading