diff --git a/src/Service/Metrics.h b/src/Service/Metrics.h index ad9892e58a..e3aecf0633 100644 --- a/src/Service/Metrics.h +++ b/src/Service/Metrics.h @@ -16,7 +16,7 @@ namespace RK inline UInt64 getCurrentTimeMilliseconds() { - return duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + return duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); }; diff --git a/src/Service/RequestProcessor.cpp b/src/Service/RequestProcessor.cpp index e3f5f059eb..2025daac55 100644 --- a/src/Service/RequestProcessor.cpp +++ b/src/Service/RequestProcessor.cpp @@ -28,7 +28,6 @@ void RequestProcessor::systemExist() void RequestProcessor::run() { setThreadName("ReqProcessor"); - Stopwatch watch; while (!shutdown_called) { @@ -43,7 +42,7 @@ void RequestProcessor::run() /// pending queue in our wait condition, it will result in meaningless waiting. bool pending_requests_empty = true; - for (auto it = pending_requests.begin(); it != pending_requests.end(); ) + for (auto it = pending_requests.begin(); it != pending_requests.end();) { if (it->second.empty()) { @@ -82,18 +81,16 @@ void RequestProcessor::run() } /// 1. process read request - watch.restart(); moveRequestToPendingQueue(); /// 2. process committed request, single thread - watch.restart(); size_t committed_request_size = committed_queue.size(); if (committed_request_size == 0) continue; + auto queuesToDrain = processCommittedRequest(committed_request_size); - Metrics::getMetrics().apply_write_request_time_ms->add(watch.elapsedMilliseconds()); /// 3. process error requests processErrorRequest(error_request_size); @@ -256,6 +253,8 @@ std::unordered_set RequestProcessor::processCommittedRequest(size_t com cv.wait(lk, [this]{ return numRequestsProcessing == 0 || shutdown_called; }); } + auto start_time_ms = getCurrentTimeMilliseconds(); + std::unordered_set queues_to_drain; size_t commits_processed = 0; @@ -328,8 +327,7 @@ std::unordered_set RequestProcessor::processCommittedRequest(size_t com committed_queue.pop(); LOG_DEBUG(log, "Move committed(write) request {} from committed_queue and pending_requests", committed_request.toSimpleString()); - auto current_time = getCurrentTimeMilliseconds(); - Metrics::getMetrics().update_latency->add(current_time - committed_request.create_time); + Metrics::getMetrics().update_latency->add(getCurrentTimeMilliseconds() - committed_request.create_time); /// remove request from pending queue auto & pending_requests_for_session = pending_requests[committed_request.session_id]; @@ -345,6 +343,7 @@ std::unordered_set RequestProcessor::processCommittedRequest(size_t com } } + Metrics::getMetrics().apply_write_request_time_ms->add(getCurrentTimeMilliseconds() - start_time_ms); Metrics::getMetrics().write_commit_proc_issued->add(commits_processed); return queues_to_drain; @@ -522,7 +521,10 @@ void RequestProcessor::readRequestProcessor(RunnerId runner_id) break; applyRequest(request_for_session); - LOG_DEBUG(log, "numRequestsProcessing {}", numRequestsProcessing.load()); + Metrics::getMetrics().read_latency->add(getCurrentTimeMilliseconds() - request_for_session.create_time); + + + LOG_DEBUG(log, "NumRequestsProcessing {}", numRequestsProcessing.load()); if (--numRequestsProcessing == 0) {