Skip to content

Commit

Permalink
changefeedccl: add scoping support for max_behind_nanos
Browse files Browse the repository at this point in the history
Currently, changefeed.max_behind_nanos is a measurement of the
lag of the furthest behind changefeed. We'd like to be able to
support scoping/grouping changefeeds. This applies scoping to
that metric similar to the scoping on changefeed.aggregator_progress.

Epic: none
Fixes: #132281

Release note (ops change): the changefeed.max_behind_nanos metric
now supports scoping with metric labels.
  • Loading branch information
aerfrei committed Dec 17, 2024
1 parent 46e8e0f commit 6e1a766
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 50 deletions.
18 changes: 2 additions & 16 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,14 +1233,14 @@ func newChangeFrontierProcessor(
return nil, err
}

sliMertics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
sliMetrics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
if err != nil {
return nil, err
}

if cf.encoder, err = getEncoder(
ctx, encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "",
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics,
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMetrics,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1467,18 +1467,13 @@ func (cf *changeFrontier) close() {
}
}

// closeMetrics de-registers from the progress registry that powers
// `changefeed.max_behind_nanos`. This method is idempotent.
func (cf *changeFrontier) closeMetrics() {
// Delete this feed from the MaxBehindNanos metric so it's no longer
// considered by the gauge.
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID > 0 {
cf.sliMetrics.RunningCount.Dec(1)
}
delete(cf.metrics.mu.resolved, cf.metricsID)
cf.metricsID = -1
}()

Expand Down Expand Up @@ -1640,15 +1635,6 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)

// This backs max_behind_nanos which is deprecated in favor of checkpoint_progress
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID != -1 {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
}()

return cf.maybeEmitResolved(newResolved)
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,20 @@ func TestChangefeedProgressMetrics(t *testing.T) {
})
}

// Verify that max_behind_nanos has recurring updates
var lastValue int64 = 0
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
value := sliA.MaxBehindNanos.Value()
if value != lastValue {
lastValue = value
return nil
}
return errors.Newf("waiting for max_behind_nanos to update %d",
lastValue)
})
}

sliB, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("label_b")
require.Equal(t, int64(0), sliB.AggregatorProgress.Value())
fooB := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='label_b', resolved='100ms'`)
Expand All @@ -450,7 +464,8 @@ func TestChangefeedProgressMetrics(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
aggregatorProgress := sliA.AggregatorProgress.Value()
checkpointProgress := sliA.CheckpointProgress.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 {
maxBehindNanos := sliA.MaxBehindNanos.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 && maxBehindNanos == 0 {
return nil
}
return errors.Newf("waiting for progress metrics to be 0 (ap=%d, cp=%d)",
Expand Down
71 changes: 39 additions & 32 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type AggMetrics struct {
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram
SinkErrors *aggmetric.AggCounter
MaxBehindNanos *aggmetric.AggGauge

Timers *timers.Timers

Expand Down Expand Up @@ -165,6 +166,7 @@ type sliMetrics struct {
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram
SinkErrors *aggmetric.Counter
MaxBehindNanos *aggmetric.Gauge

Timers *timers.ScopedTimers

Expand Down Expand Up @@ -721,17 +723,6 @@ var (
Unit: metric.Unit_NANOSECONDS,
}

// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos = metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFrontierUpdates = metric.Metadata{
Name: "changefeed.frontier_updates",
Help: "Number of change frontier updates across all feeds",
Expand Down Expand Up @@ -986,6 +977,16 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos := metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand All @@ -996,6 +997,15 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
}
return min
}
functionalGaugeMaxFn := func(childValues []int64) int64 {
var max int64
for _, val := range childValues {
if val != 0 && val > max {
max = val
}
}
return max
}

// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
Expand Down Expand Up @@ -1087,9 +1097,10 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
}),
SinkErrors: b.Counter(metaSinkErrors),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
SinkErrors: b.Counter(metaSinkErrors),
MaxBehindNanos: b.FunctionalGauge(metaChangefeedMaxBehindNanos, functionalGaugeMaxFn),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -1187,8 +1198,20 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
return minTs
}
}

maxBehindNanosGetter := func(m map[int64]hlc.Timestamp) func() int64 {
return func() int64 {
minTs := minTimestampGetter(m)()
if minTs == 0 {
return 0
}
return timeutil.Now().UnixNano() - minTs
}
}

sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)

a.mu.sliMetrics[scope] = sm
return sm, nil
Expand Down Expand Up @@ -1245,14 +1268,10 @@ type Metrics struct {
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge

// This map and the MaxBehindNanos metric are deprecated in favor of
// CheckpointProgress which is stored in the sliMetrics.
mu struct {
syncutil.Mutex
id int
resolved map[int]hlc.Timestamp
id int
}
MaxBehindNanos *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -1299,20 +1318,8 @@ func MakeMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) metric.Stru
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
now := timeutil.Now()
var maxBehind time.Duration
m.mu.Lock()
defer m.mu.Unlock()
for _, resolved := range m.mu.resolved {
if behind := now.Sub(resolved.GoTime()); behind > maxBehind {
maxBehind = behind
}
}
return maxBehind.Nanoseconds()
})

return m
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metric/prometheus_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (pm *PrometheusExporter) findOrCreateFamily(

// ScrapeRegistry scrapes all metrics contained in the registry to the metric
// family map, holding on only to the scraped data (which is no longer
// connected to the registry and metrics within) when returning from the the
// connected to the registry and metrics within) when returning from the
// call. It creates new families as needed.
func (pm *PrometheusExporter) ScrapeRegistry(registry *Registry, includeChildMetrics bool) {
labels := registry.GetLabels()
Expand Down

0 comments on commit 6e1a766

Please sign in to comment.