Skip to content

Commit

Permalink
Modify relevant monitoring metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed Jun 17, 2024
1 parent b0976e9 commit 663eddc
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/Service/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace RK

inline UInt64 getCurrentTimeMilliseconds()
{
return duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
return duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
};


Expand Down
18 changes: 10 additions & 8 deletions src/Service/RequestProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ void RequestProcessor::systemExist()
void RequestProcessor::run()
{
setThreadName("ReqProcessor");
Stopwatch watch;

while (!shutdown_called)
{
Expand All @@ -43,7 +42,7 @@ void RequestProcessor::run()
/// pending queue in our wait condition, it will result in meaningless waiting.
bool pending_requests_empty = true;

for (auto it = pending_requests.begin(); it != pending_requests.end(); )
for (auto it = pending_requests.begin(); it != pending_requests.end();)
{
if (it->second.empty())
{
Expand Down Expand Up @@ -82,18 +81,16 @@ void RequestProcessor::run()
}

/// 1. process read request
watch.restart();
moveRequestToPendingQueue();

/// 2. process committed request, single thread
watch.restart();
size_t committed_request_size = committed_queue.size();

if (committed_request_size == 0)
continue;


auto queuesToDrain = processCommittedRequest(committed_request_size);
Metrics::getMetrics().apply_write_request_time_ms->add(watch.elapsedMilliseconds());

/// 3. process error requests
processErrorRequest(error_request_size);
Expand Down Expand Up @@ -256,6 +253,8 @@ std::unordered_set<int64_t> RequestProcessor::processCommittedRequest(size_t com
cv.wait(lk, [this]{ return numRequestsProcessing == 0 || shutdown_called; });
}

auto start_time_ms = getCurrentTimeMilliseconds();

std::unordered_set<int64_t> queues_to_drain;

size_t commits_processed = 0;
Expand Down Expand Up @@ -328,8 +327,7 @@ std::unordered_set<int64_t> RequestProcessor::processCommittedRequest(size_t com
committed_queue.pop();

LOG_DEBUG(log, "Move committed(write) request {} from committed_queue and pending_requests", committed_request.toSimpleString());
auto current_time = getCurrentTimeMilliseconds();
Metrics::getMetrics().update_latency->add(current_time - committed_request.create_time);
Metrics::getMetrics().update_latency->add(getCurrentTimeMilliseconds() - committed_request.create_time);

/// remove request from pending queue
auto & pending_requests_for_session = pending_requests[committed_request.session_id];
Expand All @@ -345,6 +343,7 @@ std::unordered_set<int64_t> RequestProcessor::processCommittedRequest(size_t com
}
}

Metrics::getMetrics().apply_write_request_time_ms->add(getCurrentTimeMilliseconds() - start_time_ms);
Metrics::getMetrics().write_commit_proc_issued->add(commits_processed);

return queues_to_drain;
Expand Down Expand Up @@ -522,7 +521,10 @@ void RequestProcessor::readRequestProcessor(RunnerId runner_id)
break;
applyRequest(request_for_session);

LOG_DEBUG(log, "numRequestsProcessing {}", numRequestsProcessing.load());
Metrics::getMetrics().read_latency->add(getCurrentTimeMilliseconds() - request_for_session.create_time);


LOG_DEBUG(log, "NumRequestsProcessing {}", numRequestsProcessing.load());

if (--numRequestsProcessing == 0)
{
Expand Down

0 comments on commit 663eddc

Please sign in to comment.