From 46422573dd0a595108c57cbd3b2a629091d20067 Mon Sep 17 00:00:00 2001 From: Mario Date: Tue, 24 Oct 2023 16:16:40 +0200 Subject: [PATCH] [servicegraphprocessor, servicegraphconnector] Measure latency in seconds instead of milliseconds (#27665) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Measures latency in seconds instead of milliseconds, as the metric name indicates. Previously, milliseconds was used. This unit is still available via the feature gate `processor.servicegraph.legacyLatencyUnitMs`. This is a breaking change. **Link to tracking Issue:** #27488 **Testing:** Tests are updated **Documentation:** --------- Co-authored-by: Curtis Robert <92119472+crobert-1@users.noreply.github.com> Co-authored-by: Juraci Paixão Kröhling --- .chloggen/servicegraph-fix-time-unit.yaml | 31 ++++++++ processor/servicegraphprocessor/factory.go | 9 ++- .../servicegraphprocessor/factory_test.go | 12 +-- processor/servicegraphprocessor/processor.go | 41 +++++++--- .../servicegraphprocessor/processor_test.go | 75 +++++++++++++------ 5 files changed, 125 insertions(+), 43 deletions(-) create mode 100755 .chloggen/servicegraph-fix-time-unit.yaml diff --git a/.chloggen/servicegraph-fix-time-unit.yaml b/.chloggen/servicegraph-fix-time-unit.yaml new file mode 100755 index 000000000000..f9455bf2100b --- /dev/null +++ b/.chloggen/servicegraph-fix-time-unit.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: servicegraphprocessor, servicegraphconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Measure latency in seconds instead of milliseconds + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27488] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Measures latency in seconds instead of milliseconds, as the metric name indicates. + Previously, milliseconds was used. + This unit is still available via the feature gate `processor.servicegraph.legacyLatencyUnitMs`. + This is a breaking change. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/servicegraphprocessor/factory.go b/processor/servicegraphprocessor/factory.go index df0a664e0277..2175ef97f1e6 100644 --- a/processor/servicegraphprocessor/factory.go +++ b/processor/servicegraphprocessor/factory.go @@ -22,9 +22,10 @@ const ( connectorStability = component.StabilityLevelDevelopment virtualNodeFeatureGateID = "processor.servicegraph.virtualNode" legacyLatencyMetricNamesFeatureGateID = "processor.servicegraph.legacyLatencyMetricNames" + legacyLatencyUnitMs = "processor.servicegraph.legacyLatencyUnitMs" ) -var virtualNodeFeatureGate, legacyMetricNamesFeatureGate *featuregate.Gate +var virtualNodeFeatureGate, legacyMetricNamesFeatureGate, legacyLatencyUnitMsFeatureGate *featuregate.Gate func init() { virtualNodeFeatureGate = featuregate.GlobalRegistry().MustRegister( @@ -40,6 +41,12 @@ func init() { featuregate.WithRegisterDescription("When enabled, processor uses legacy latency metric names."), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18743,https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16578"), ) + legacyLatencyUnitMsFeatureGate = featuregate.GlobalRegistry().MustRegister( + legacyLatencyUnitMs, + featuregate.StageAlpha, // Alpha because we want it disabled by default. + featuregate.WithRegisterDescription("When enabled, processor reports latency in milliseconds, instead of seconds."), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27488"), + ) } // NewFactory creates a factory for the servicegraph processor. diff --git a/processor/servicegraphprocessor/factory_test.go b/processor/servicegraphprocessor/factory_test.go index c94428ba0638..fa06c3699a1f 100644 --- a/processor/servicegraphprocessor/factory_test.go +++ b/processor/servicegraphprocessor/factory_test.go @@ -22,17 +22,17 @@ func TestNewProcessor(t *testing.T) { }{ { name: "simplest config (use defaults)", - expectedLatencyHistogramBuckets: defaultLatencyHistogramBucketsMs, + expectedLatencyHistogramBuckets: defaultLatencyHistogramBuckets, }, { name: "latency histogram configured with catch-all bucket to check no additional catch-all bucket inserted", latencyHistogramBuckets: []time.Duration{2 * time.Millisecond}, - expectedLatencyHistogramBuckets: []float64{2}, + expectedLatencyHistogramBuckets: []float64{0.002}, }, { name: "full config with no catch-all bucket and check the catch-all bucket is inserted", latencyHistogramBuckets: []time.Duration{2 * time.Millisecond}, - expectedLatencyHistogramBuckets: []float64{2}, + expectedLatencyHistogramBuckets: []float64{0.002}, }, } { t.Run(tc.name, func(t *testing.T) { @@ -64,17 +64,17 @@ func TestNewConnector(t *testing.T) { }{ { name: "simplest config (use defaults)", - expectedLatencyHistogramBuckets: defaultLatencyHistogramBucketsMs, + expectedLatencyHistogramBuckets: defaultLatencyHistogramBuckets, }, { name: "latency histogram configured with catch-all bucket to check no additional catch-all bucket inserted", latencyHistogramBuckets: []time.Duration{2 * time.Millisecond}, - expectedLatencyHistogramBuckets: []float64{2}, + expectedLatencyHistogramBuckets: []float64{0.002}, }, { name: "full config with no catch-all bucket and check the catch-all bucket is inserted", latencyHistogramBuckets: []time.Duration{2 * time.Millisecond}, - expectedLatencyHistogramBuckets: []float64{2}, + expectedLatencyHistogramBuckets: []float64{0.002}, }, } { t.Run(tc.name, func(t *testing.T) { diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index a68314ea0233..28fb5ca8893a 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -33,9 +33,13 @@ const ( ) var ( - defaultLatencyHistogramBucketsMs = []float64{ + legacyDefaultLatencyHistogramBuckets = []float64{ 2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000, } + defaultLatencyHistogramBuckets = []float64{ + 0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1, 1.4, 2, 5, 10, 15, + } + defaultPeerAttributes = []string{ semconv.AttributeDBName, semconv.AttributeNetSockPeerAddr, semconv.AttributeNetPeerName, semconv.AttributeRPCService, semconv.AttributeNetSockPeerName, semconv.AttributeNetPeerName, semconv.AttributeHTTPURL, semconv.AttributeHTTPTarget, } @@ -78,9 +82,12 @@ type serviceGraphProcessor struct { func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProcessor { pConfig := config.(*Config) - bounds := defaultLatencyHistogramBucketsMs + bounds := defaultLatencyHistogramBuckets + if legacyLatencyUnitMsFeatureGate.IsEnabled() { + bounds = legacyDefaultLatencyHistogramBuckets + } if pConfig.LatencyHistogramBuckets != nil { - bounds = mapDurationsToMillis(pConfig.LatencyHistogramBuckets) + bounds = mapDurationsToFloat(pConfig.LatencyHistogramBuckets) } if pConfig.CacheLoop <= 0 { @@ -254,7 +261,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace. e.TraceID = traceID e.ConnectionType = connectionType e.ClientService = serviceName - e.ClientLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) + e.ClientLatencySec = spanDuration(span) e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError p.upsertDimensions(clientKind, e.Dimensions, rAttributes, span.Attributes()) @@ -267,7 +274,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace. if dbName, ok := findAttributeValue(semconv.AttributeDBName, rAttributes, span.Attributes()); ok { e.ConnectionType = store.Database e.ServerService = dbName - e.ServerLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) + e.ServerLatencySec = spanDuration(span) } }) case ptrace.SpanKindConsumer: @@ -281,7 +288,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace. e.TraceID = traceID e.ConnectionType = connectionType e.ServerService = serviceName - e.ServerLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) + e.ServerLatencySec = spanDuration(span) e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError p.upsertDimensions(serverKind, e.Dimensions, rAttributes, span.Attributes()) }) @@ -655,16 +662,26 @@ func (p *serviceGraphProcessor) cleanCache() { p.seriesMutex.Unlock() } -// durationToMillis converts the given duration to the number of milliseconds it represents. -// Note that this can return sub-millisecond (i.e. < 1ms) values as well. -func durationToMillis(d time.Duration) float64 { - return float64(d.Nanoseconds()) / float64(time.Millisecond.Nanoseconds()) +// spanDuration returns the duration of the given span in seconds (legacy ms). +func spanDuration(span ptrace.Span) float64 { + if legacyLatencyUnitMsFeatureGate.IsEnabled() { + return float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) + } + return float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Second.Nanoseconds()) +} + +// durationToFloat converts the given duration to the number of seconds (legacy ms) it represents. +func durationToFloat(d time.Duration) float64 { + if legacyLatencyUnitMsFeatureGate.IsEnabled() { + return float64(d.Milliseconds()) + } + return d.Seconds() } -func mapDurationsToMillis(vs []time.Duration) []float64 { +func mapDurationsToFloat(vs []time.Duration) []float64 { vsm := make([]float64, len(vs)) for i, v := range vs { - vsm[i] = durationToMillis(v) + vsm[i] = durationToFloat(v) } return vsm } diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index 159fc5916f87..9dd0e8949c3c 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -123,12 +123,10 @@ func TestConnectorShutdown(t *testing.T) { } func TestProcessorConsume(t *testing.T) { - // set virtual node feature - _ = featuregate.GlobalRegistry().Set(virtualNodeFeatureGate.ID(), true) - for _, tc := range []struct { name string cfg Config + gates []*featuregate.Gate sampleTraces ptrace.Traces verifyMetrics func(t *testing.T, md pmetric.Metrics) }{ @@ -155,6 +153,7 @@ func TestProcessorConsume(t *testing.T) { TTL: time.Nanosecond, }, }, + gates: []*featuregate.Gate{virtualNodeFeatureGate}, sampleTraces: incompleteClientTraces(), verifyMetrics: func(t *testing.T, md pmetric.Metrics) { v, ok := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Get("server") @@ -172,6 +171,7 @@ func TestProcessorConsume(t *testing.T) { TTL: time.Nanosecond, }, }, + gates: []*featuregate.Gate{virtualNodeFeatureGate}, sampleTraces: incompleteServerTraces(false), verifyMetrics: func(t *testing.T, md pmetric.Metrics) { v, ok := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Get("client") @@ -189,13 +189,32 @@ func TestProcessorConsume(t *testing.T) { TTL: time.Nanosecond, }, }, + gates: []*featuregate.Gate{virtualNodeFeatureGate}, sampleTraces: incompleteServerTraces(true), verifyMetrics: func(t *testing.T, md pmetric.Metrics) { assert.Equal(t, 0, md.MetricCount()) }, }, + { + name: "complete traces with legacy latency metrics", + cfg: Config{ + MetricsExporter: "mock", + Dimensions: []string{"some-attribute", "non-existing-attribute"}, + Store: StoreConfig{ + MaxItems: 10, + TTL: time.Nanosecond, + }, + }, sampleTraces: buildSampleTrace(t, "val"), + gates: []*featuregate.Gate{virtualNodeFeatureGate, legacyLatencyUnitMsFeatureGate}, + verifyMetrics: verifyHappyCaseMetricsWithDuration(1000), + }, } { t.Run(tc.name, func(t *testing.T) { + // Set feature gates + for _, gate := range tc.gates { + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), true)) + } + // Prepare p := newProcessor(zaptest.NewLogger(t), &tc.cfg) p.tracesConsumer = consumertest.NewNop() @@ -224,11 +243,13 @@ func TestProcessorConsume(t *testing.T) { // Shutdown the processor assert.NoError(t, p.Shutdown(context.Background())) + + // Unset feature gates + for _, gate := range tc.gates { + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), false)) + } }) } - - // unset virtual node feature - _ = featuregate.GlobalRegistry().Set(virtualNodeFeatureGate.ID(), false) } func TestConnectorConsume(t *testing.T) { @@ -296,27 +317,33 @@ func TestProcessor_MetricsFlushInterval(t *testing.T) { } func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) { - assert.Equal(t, 3, md.MetricCount()) + verifyHappyCaseMetricsWithDuration(1)(t, md) +} - rms := md.ResourceMetrics() - assert.Equal(t, 1, rms.Len()) +func verifyHappyCaseMetricsWithDuration(durationSum float64) func(t *testing.T, md pmetric.Metrics) { + return func(t *testing.T, md pmetric.Metrics) { + assert.Equal(t, 3, md.MetricCount()) - sms := rms.At(0).ScopeMetrics() - assert.Equal(t, 1, sms.Len()) + rms := md.ResourceMetrics() + assert.Equal(t, 1, rms.Len()) - ms := sms.At(0).Metrics() - assert.Equal(t, 3, ms.Len()) + sms := rms.At(0).ScopeMetrics() + assert.Equal(t, 1, sms.Len()) - mCount := ms.At(0) - verifyCount(t, mCount) + ms := sms.At(0).Metrics() + assert.Equal(t, 3, ms.Len()) - mServerDuration := ms.At(1) - assert.Equal(t, "traces_service_graph_request_server_seconds", mServerDuration.Name()) - verifyDuration(t, mServerDuration) + mCount := ms.At(0) + verifyCount(t, mCount) - mClientDuration := ms.At(2) - assert.Equal(t, "traces_service_graph_request_client_seconds", mClientDuration.Name()) - verifyDuration(t, mClientDuration) + mServerDuration := ms.At(1) + assert.Equal(t, "traces_service_graph_request_server_seconds", mServerDuration.Name()) + verifyDuration(t, mServerDuration, durationSum) + + mClientDuration := ms.At(2) + assert.Equal(t, "traces_service_graph_request_client_seconds", mClientDuration.Name()) + verifyDuration(t, mClientDuration, durationSum) + } } func verifyCount(t *testing.T, m pmetric.Metric) { @@ -339,13 +366,13 @@ func verifyCount(t *testing.T, m pmetric.Metric) { verifyAttr(t, attributes, "client_some-attribute", "val") } -func verifyDuration(t *testing.T, m pmetric.Metric) { +func verifyDuration(t *testing.T, m pmetric.Metric, durationSum float64) { assert.Equal(t, pmetric.MetricTypeHistogram, m.Type()) dps := m.Histogram().DataPoints() assert.Equal(t, 1, dps.Len()) dp := dps.At(0) - assert.Equal(t, float64(1000), dp.Sum()) // Duration: 1sec + assert.Equal(t, durationSum, dp.Sum()) // Duration: 1sec assert.Equal(t, uint64(1), dp.Count()) buckets := pcommon.NewUInt64Slice() buckets.FromRaw([]uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}) @@ -526,7 +553,7 @@ func TestUpdateDurationMetrics(t *testing.T) { reqClientDurationSecondsSum: make(map[string]float64), reqClientDurationSecondsCount: make(map[string]uint64), reqClientDurationSecondsBucketCounts: make(map[string][]uint64), - reqDurationBounds: defaultLatencyHistogramBucketsMs, + reqDurationBounds: defaultLatencyHistogramBuckets, keyToMetric: make(map[string]metricSeries), config: &Config{ Dimensions: []string{},