Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/spanmetrics] Fix spanmetrics for child spans #36718

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,6 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
*/
deltaMetricKeys := make(map[metrics.Key]bool)
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
startTime := rawMetrics.startTimestamp
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
startTime = lastTimestamp
}
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
deltaMetricKeys[mk] = true
}
return startTime
}

metricsNamespace := p.config.Namespace
if legacyMetricNamesFeatureGate.IsEnabled() && metricsNamespace == DefaultNamespace {
Expand All @@ -299,21 +288,21 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameCalls))
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())

if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameEvents))
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

for mk := range deltaMetricKeys {
Expand Down Expand Up @@ -406,12 +395,12 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
}
if !p.config.Histogram.Disable {
// aggregate histogram metrics
h := histograms.GetOrCreate(key, attributes)
h := histograms.GetOrCreate(key, attributes, startTimestamp)
p.addExemplar(span, duration, h)
h.Observe(duration)
}
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
s := sums.GetOrCreate(key, attributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
s.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand All @@ -435,7 +424,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions)
p.metricKeyToDimensions.Add(eKey, eAttributes)
}
e := events.GetOrCreate(eKey, eAttributes)
e := events.GetOrCreate(eKey, eAttributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand Down Expand Up @@ -479,8 +468,8 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta
if !ok {
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
attributes: attr,
startTimestamp: startTimestamp,
}
Expand Down
73 changes: 51 additions & 22 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
type Key string

type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality)
GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality)
ClearExemplars()
}

Expand Down Expand Up @@ -47,6 +47,9 @@ type explicitHistogram struct {
bounds []float64

maxExemplarCount *int

startTimestamp pcommon.Timestamp
lastSeenTimestamp pcommon.Timestamp
}

type exponentialHistogram struct {
Expand All @@ -56,6 +59,9 @@ type exponentialHistogram struct {
histogram *structure.Histogram[float64]

maxExemplarCount *int

startTimestamp pcommon.Timestamp
lastSeenTimestamp pcommon.Timestamp
}

type generateStartTimestamp = func(Key) pcommon.Timestamp
Expand All @@ -76,7 +82,7 @@ func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) Histog
}
}

func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
h = &explicitHistogram{
Expand All @@ -89,21 +95,27 @@ func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map)
m.metrics[key] = h
}

h.lastSeenTimestamp = startTimestamp
return h
}

func (m *explicitHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyHistogram().SetAggregationTemporality(temporality)
dps := metric.Histogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, h := range m.metrics {
for _, h := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
var startTimeStamp pcommon.Timestamp
if temporality == pmetric.AggregationTemporalityDelta {
startTimeStamp = h.lastSeenTimestamp
} else {
startTimeStamp = h.startTimestamp
}
dp.SetStartTimestamp(startTimeStamp)
dp.SetTimestamp(timestamp)
dp.ExplicitBounds().FromRaw(h.bounds)
dp.BucketCounts().FromRaw(h.bucketCounts)
Expand All @@ -123,7 +135,7 @@ func (m *explicitHistogramMetrics) ClearExemplars() {
}
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimeStamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
histogram := new(structure.Histogram[float64])
Expand All @@ -141,28 +153,34 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma
m.metrics[key] = h
}

h.lastSeenTimestamp = startTimeStamp
return h
}

func (m *exponentialHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality)
dps := metric.ExponentialHistogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, m := range m.metrics {
for _, e := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
var startTimeStamp pcommon.Timestamp
if temporality == pmetric.AggregationTemporalityDelta {
startTimeStamp = e.lastSeenTimestamp
} else {
startTimeStamp = e.startTimestamp
}
dp.SetStartTimestamp(startTimeStamp)
dp.SetTimestamp(timestamp)
expoHistToExponentialDataPoint(m.histogram, dp)
for i := 0; i < m.exemplars.Len(); i++ {
m.exemplars.At(i).SetTimestamp(timestamp)
expoHistToExponentialDataPoint(e.histogram, dp)
for i := 0; i < e.exemplars.Len(); i++ {
e.exemplars.At(i).SetTimestamp(timestamp)
}
m.exemplars.CopyTo(dp.Exemplars())
m.attributes.CopyTo(dp.Attributes())
e.exemplars.CopyTo(dp.Exemplars())
e.attributes.CopyTo(dp.Attributes())
}
}

Expand Down Expand Up @@ -237,17 +255,21 @@ func (h *exponentialHistogram) AddExemplar(traceID pcommon.TraceID, spanID pcomm
}

type Sum struct {
attributes pcommon.Map
count uint64
attributes pcommon.Map
count uint64

exemplars pmetric.ExemplarSlice
maxExemplarCount *int

startTimestamp pcommon.Timestamp
lastSeenTimestamp pcommon.Timestamp
}

func (s *Sum) Add(value uint64) {
s.count += value
}

func NewSumMetrics(maxExemplarCount *int) SumMetrics {
func NewSumMetrics(maxExemplarCount *int, startTimeStamp pcommon.Timestamp) SumMetrics {
return SumMetrics{
metrics: make(map[Key]*Sum),
maxExemplarCount: maxExemplarCount,
Expand All @@ -259,16 +281,18 @@ type SumMetrics struct {
maxExemplarCount *int
}

func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum {
func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) *Sum {
s, ok := m.metrics[key]
if !ok {
s = &Sum{
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
startTimestamp: startTimestamp,
}
m.metrics[key] = s
}
s.lastSeenTimestamp = startTimestamp
return s
}

Expand All @@ -284,7 +308,6 @@ func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value

func (m *SumMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
Expand All @@ -293,9 +316,15 @@ func (m *SumMetrics) BuildMetrics(

dps := metric.Sum().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, s := range m.metrics {
for _, s := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
var startTimeStamp pcommon.Timestamp
if temporality == pmetric.AggregationTemporalityDelta {
startTimeStamp = s.lastSeenTimestamp
} else {
startTimeStamp = s.startTimestamp
}
dp.SetStartTimestamp(startTimeStamp)
dp.SetTimestamp(timestamp)
dp.SetIntValue(int64(s.count))
for i := 0; i < s.exemplars.Len(); i++ {
Expand Down
Loading