Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: add scoping support for max_behind_nanos #137534

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@
<tr><td>APPLICATION</td><td>changefeed.internal_retry_message_count</td><td>Number of messages for which an attempt to retry them within an aggregator node was made</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.kafka_throttling_hist_nanos</td><td>Time spent in throttling due to exceeding kafka quota</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.lagging_ranges</td><td>The number of ranges considered to be lagging behind</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>(Deprecated in favor of checkpoint_progress) The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.message_size_hist</td><td>Message size histogram</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.messages.messages_pushback_nanos</td><td>Total time spent throttled for messages quota</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.network.bytes_in</td><td>The number of bytes received from the network by changefeeds</td><td>Bytes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
18 changes: 2 additions & 16 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,14 +1233,14 @@ func newChangeFrontierProcessor(
return nil, err
}

sliMertics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
sliMetrics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂

if err != nil {
return nil, err
}

if cf.encoder, err = getEncoder(
ctx, encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "",
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics,
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMetrics,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1467,18 +1467,13 @@ func (cf *changeFrontier) close() {
}
}

// closeMetrics de-registers from the progress registry that powers
// `changefeed.max_behind_nanos`. This method is idempotent.
func (cf *changeFrontier) closeMetrics() {
// Delete this feed from the MaxBehindNanos metric so it's no longer
// considered by the gauge.
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID > 0 {
andyyang890 marked this conversation as resolved.
Show resolved Hide resolved
cf.sliMetrics.RunningCount.Dec(1)
}
delete(cf.metrics.mu.resolved, cf.metricsID)
cf.metricsID = -1
}()

Expand Down Expand Up @@ -1640,15 +1635,6 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)

// This backs max_behind_nanos which is deprecated in favor of checkpoint_progress
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID != -1 {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
}()

return cf.maybeEmitResolved(newResolved)
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,20 @@ func TestChangefeedProgressMetrics(t *testing.T) {
})
}

// Verify that max_behind_nanos has recurring updates
var lastValue int64 = 0
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
value := sliA.MaxBehindNanos.Value()
if value != lastValue {
lastValue = value
return nil
}
return errors.Newf("waiting for max_behind_nanos to update %d",
lastValue)
})
}

sliB, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("label_b")
require.Equal(t, int64(0), sliB.AggregatorProgress.Value())
fooB := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='label_b', resolved='100ms'`)
Expand All @@ -450,7 +464,8 @@ func TestChangefeedProgressMetrics(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
aggregatorProgress := sliA.AggregatorProgress.Value()
checkpointProgress := sliA.CheckpointProgress.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 {
maxBehindNanos := sliA.MaxBehindNanos.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 && maxBehindNanos == 0 {
return nil
}
return errors.Newf("waiting for progress metrics to be 0 (ap=%d, cp=%d)",
Expand Down
70 changes: 38 additions & 32 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package changefeedccl

import (
"context"
"slices"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -83,6 +84,7 @@ type AggMetrics struct {
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram
SinkErrors *aggmetric.AggCounter
MaxBehindNanos *aggmetric.AggGauge

Timers *timers.Timers

Expand Down Expand Up @@ -165,6 +167,7 @@ type sliMetrics struct {
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram
SinkErrors *aggmetric.Counter
MaxBehindNanos *aggmetric.Gauge

Timers *timers.ScopedTimers

Expand Down Expand Up @@ -721,17 +724,6 @@ var (
Unit: metric.Unit_NANOSECONDS,
}

// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos = metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFrontierUpdates = metric.Metadata{
Name: "changefeed.frontier_updates",
Help: "Number of change frontier updates across all feeds",
Expand Down Expand Up @@ -986,6 +978,16 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos := metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand All @@ -997,6 +999,13 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
return min
}

functionalGaugeMaxFn := func(childValues []int64) int64 {
if len(childValues) == 0 {
return 0
}
return slices.Max(childValues)
}

// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
Expand Down Expand Up @@ -1087,9 +1096,10 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
}),
SinkErrors: b.Counter(metaSinkErrors),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
SinkErrors: b.Counter(metaSinkErrors),
MaxBehindNanos: b.FunctionalGauge(metaChangefeedMaxBehindNanos, functionalGaugeMaxFn),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -1187,8 +1197,20 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
return minTs
}
}

maxBehindNanosGetter := func(m map[int64]hlc.Timestamp) func() int64 {
return func() int64 {
minTs := minTimestampGetter(m)()
if minTs == 0 {
return 0
}
return timeutil.Now().UnixNano() - minTs
}
}

sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)

a.mu.sliMetrics[scope] = sm
return sm, nil
Expand Down Expand Up @@ -1245,14 +1267,10 @@ type Metrics struct {
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge

// This map and the MaxBehindNanos metric are deprecated in favor of
// CheckpointProgress which is stored in the sliMetrics.
mu struct {
syncutil.Mutex
id int
resolved map[int]hlc.Timestamp
id int
}
MaxBehindNanos *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -1299,20 +1317,8 @@ func MakeMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) metric.Stru
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
now := timeutil.Now()
var maxBehind time.Duration
m.mu.Lock()
defer m.mu.Unlock()
for _, resolved := range m.mu.resolved {
if behind := now.Sub(resolved.GoTime()); behind > maxBehind {
maxBehind = behind
}
}
return maxBehind.Nanoseconds()
})

return m
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metric/prometheus_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (pm *PrometheusExporter) findOrCreateFamily(

// ScrapeRegistry scrapes all metrics contained in the registry to the metric
// family map, holding on only to the scraped data (which is no longer
// connected to the registry and metrics within) when returning from the the
// connected to the registry and metrics within) when returning from the
// call. It creates new families as needed.
func (pm *PrometheusExporter) ScrapeRegistry(registry *Registry, includeChildMetrics bool) {
labels := registry.GetLabels()
Expand Down
Loading