Skip to content

Commit

Permalink
kvserver/rangefeed: add kv.rangefeed.output_loop_nanos
Browse files Browse the repository at this point in the history
This patch introduces a new metric, kv.rangefeed.output_loop_nanos, which tracks
the duration of the per-range, per-registration runOutputLoop goroutine. This
goroutine is expected to be short-lived for unbuffered registrations, while it
remains long-lived for buffered registrations. Note that this metric is only
non-zero for unbuffered registrations.

Part of: #129816
Release note: none
  • Loading branch information
wenyihu6 committed Dec 10, 2024
1 parent 086dcc3 commit 63e2a8a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
<tr><td>STORAGE</td><td>kv.rangefeed.closed_timestamp_max_behind_nanos</td><td>Largest latency between realtime and replica max closed timestamp for replicas that have active rangeeds on them</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.mem_shared</td><td>Memory usage by rangefeeds</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.mem_system</td><td>Memory usage by rangefeeds on system ranges</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.output_loop_nanos</td><td>Duration of the Rangefeed O(range) output loop goroutine</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.processors_goroutine</td><td>Number of active RangeFeed processors using goroutines</td><td>Processors</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.processors_scheduler</td><td>Number of active RangeFeed processors using scheduler</td><td>Processors</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.registrations</td><td>Number of active RangeFeed registrations</td><td>Registrations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ import (
)

var (
metaRangeFeedOutputLoopNanos = metric.Metadata{
Name: "kv.rangefeed.output_loop_nanos",
Help: "Duration of the Rangefeed O(range) output loop goroutine",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaRangeFeedCatchUpScanNanos = metric.Metadata{
Name: "kv.rangefeed.catchup_scan_nanos",
Help: "Time spent in RangeFeed catchup scan",
Expand Down Expand Up @@ -98,6 +104,7 @@ var (

// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangefeedOutputLoopNanos *metric.Counter
RangeFeedCatchUpScanNanos *metric.Counter
RangeFeedBudgetExhausted *metric.Counter
RangefeedProcessorQueueTimeout *metric.Counter
Expand Down Expand Up @@ -126,6 +133,7 @@ func (*Metrics) MetricStruct() {}
// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
RangefeedOutputLoopNanos: metric.NewCounter(metaRangeFeedOutputLoopNanos),
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout),
RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted),
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/rangefeed/unbuffered_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,13 @@ func (ubr *unbufferedRegistration) IsDisconnected() bool {
//
// nolint:deferunlockcheck
func (ubr *unbufferedRegistration) runOutputLoop(ctx context.Context, forStacks roachpb.RangeID) {
start := timeutil.Now()
ubr.mu.Lock()
// Noop if publishCatchUpBuffer below returns no error.
defer ubr.drainAllocations(ctx)
defer func() {
// Noop if publishCatchUpBuffer below returns no error.
ubr.drainAllocations(ctx)
ubr.metrics.RangefeedOutputLoopNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

if ubr.mu.disconnected {
ubr.mu.Unlock()
Expand Down

0 comments on commit 63e2a8a

Please sign in to comment.