From 1ac905a3f1c867e17d5831709c35eb44c0b573db Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Mon, 7 Oct 2024 11:04:22 -0400 Subject: [PATCH] changefeedccl: improve sink error observability Add sink error metric (`changefeed.sink_errors`) and expand reporting of internal retries metric (`changefeed.internal_retry_message_count`) to all sinks that perform internal retries. Part of: #127784 Release note (enterprise change): Add sink error metric (`changefeed.sink_errors`) and expand reporting of internal retries metric (`changefeed.internal_retry_message_count`) to all sinks that perform internal retries. --- docs/generated/metrics/metrics.html | 1 + pkg/ccl/changefeedccl/event_processing.go | 4 ++++ pkg/ccl/changefeedccl/metrics.go | 10 ++++++++ pkg/ccl/changefeedccl/sink_webhook.go | 28 ++++++++++++++++------- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 3f0d347fa423..6e4ae59ab64b 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -898,6 +898,7 @@ APPLICATIONchangefeed.schemafeed.table_history_scansThe number of table history scans during pollingCountsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.schemafeed.table_metadata_nanosTime blocked while verifying table metadata historiesNanosecondsCOUNTERNANOSECONDSAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.sink_batch_hist_nanosTime spent batched in the sink buffer before being flushed and acknowledgedChangefeedsHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.sink_errorsNumber of changefeed errors caused by the sinkCountCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.sink_io_inflightThe number of keys currently inflight as IO requests being sent to the sinkMessagesGAUGECOUNTAVGNONE APPLICATIONchangefeed.size_based_flushesTotal size based flushes across all feedsFlushesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.total_rangesThe total number of ranges being watched by changefeed aggregatorsRangesGAUGECOUNTAVGNONE diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 3cd9d6187da6..522efde052d5 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -453,6 +453,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit( ) }) if err != nil { + if !errors.Is(err, context.Canceled) { + log.Warningf(ctx, `sink failed to emit row: %v`, err) + c.metrics.SinkErrors.Inc(1) + } return err } if log.V(3) { diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 1c24026615b7..17a1e76b6004 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -85,6 +85,7 @@ type AggMetrics struct { TotalRanges *aggmetric.AggGauge CloudstorageBufferedBytes *aggmetric.AggGauge KafkaThrottlingNanos *aggmetric.AggHistogram + SinkErrors *aggmetric.AggCounter Timers *timers.Timers @@ -165,6 +166,7 @@ type sliMetrics struct { TotalRanges *aggmetric.Gauge CloudstorageBufferedBytes *aggmetric.Gauge KafkaThrottlingNanos *aggmetric.Histogram + SinkErrors *aggmetric.Counter Timers *timers.ScopedTimers @@ -968,6 +970,12 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + metaSinkErrors := metric.Metadata{ + Name: "changefeed.sink_errors", + Help: "Number of changefeed errors caused by the sink", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -1069,6 +1077,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag SigFigs: 2, BucketConfig: metric.BatchProcessLatencyBuckets, }), + SinkErrors: b.Counter(metaSinkErrors), Timers: timers.New(histogramWindow), NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"), } @@ -1139,6 +1148,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { TotalRanges: a.TotalRanges.AddChild(scope), CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope), KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope), + SinkErrors: a.SinkErrors.AddChild(scope), Timers: a.Timers.GetOrCreateScopedTimers(scope), diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index 3b40b30a6d4d..93b946dc186b 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -77,15 +77,17 @@ type webhookSinkPayload struct { } type encodedPayload struct { - data []byte - alloc kvevent.Alloc - emitTime time.Time - mvcc hlc.Timestamp + data []byte + alloc kvevent.Alloc + emitTime time.Time + mvcc hlc.Timestamp + recordCount int } func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ - emitTime: timeutil.Now(), + emitTime: timeutil.Now(), + recordCount: len(messages), } payload := make([]json.RawMessage, len(messages)) @@ -549,7 +551,7 @@ func (s *deprecatedWebhookSink) workerLoop(workerIndex int) { s.exitWorkersWithError(err) return } - if err := s.sendMessageWithRetries(s.workerCtx, encoded.data); err != nil { + if err := s.sendMessageWithRetries(s.workerCtx, encoded.data, encoded.recordCount); err != nil { s.exitWorkersWithError(err) return } @@ -560,9 +562,19 @@ func (s *deprecatedWebhookSink) workerLoop(workerIndex int) { } } -func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessageWithRetries( + ctx context.Context, reqBody []byte, recordCount int, +) error { + firstTry := true requestFunc := func() error { + if firstTry { + firstTry = false + } else { + s.metrics.recordInternalRetry(int64(recordCount), false) + } + return s.sendMessage(ctx, reqBody) + } return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc) } @@ -685,7 +697,7 @@ func (s *deprecatedWebhookSink) EmitResolvedTimestamp( // do worker logic directly here instead (there's no point using workers for // resolved timestamps since there are no keys and everything must be // in order) - if err := s.sendMessageWithRetries(ctx, payload); err != nil { + if err := s.sendMessageWithRetries(ctx, payload, 1); err != nil { s.exitWorkersWithError(err) return err }