diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 4bf25c198920..ce69321cadd9 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -93,3 +93,7 @@ func (s *Staleness[T]) Evict() identity.Stream { s.items.Delete(id) return id } + +func (s *Staleness[T]) Clear() { + s.items.Clear() +} diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 98fec276238e..92b75d7ac0ee 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -17,7 +17,7 @@ func TestStaleness(t *testing.T) { max := 1 * time.Second stalenessMap := NewStaleness[int]( max, - make(streams.HashMap[int]), + &streams.HashMap[int]{}, ) idA := generateStreamID(t, map[string]any{ diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 90bebb63c091..35427496c165 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -16,29 +16,30 @@ type Map[T any] interface { Delete(identity.Stream) Items() func(yield func(identity.Stream, T) bool) bool Len() int + Clear() } -var _ Map[any] = HashMap[any](nil) +var _ Map[any] = &HashMap[any]{} type HashMap[T any] map[identity.Stream]T -func (m HashMap[T]) Load(id identity.Stream) (T, bool) { - v, ok := (map[identity.Stream]T)(m)[id] +func (m *HashMap[T]) Load(id identity.Stream) (T, bool) { + v, ok := (*m)[id] return v, ok } -func (m HashMap[T]) Store(id identity.Stream, v T) error { - (map[identity.Stream]T)(m)[id] = v +func (m *HashMap[T]) Store(id identity.Stream, v T) error { + (*m)[id] = v return nil } -func (m HashMap[T]) Delete(id identity.Stream) { - delete((map[identity.Stream]T)(m), id) +func (m *HashMap[T]) Delete(id identity.Stream) { + delete(*m, id) } -func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool { +func (m *HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool { return func(yield func(identity.Stream, T) bool) bool { - for id, v := range (map[identity.Stream]T)(m) { + for id, v := range *m { if !yield(id, v) { break } @@ -47,8 +48,12 @@ func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool { } } -func (m HashMap[T]) Len() int { - return len((map[identity.Stream]T)(m)) +func (m *HashMap[T]) Len() int { + return len(*m) +} + +func (m *HashMap[T]) Clear() { + *m = map[identity.Stream]T{} } // Evictors remove the "least important" stream based on some strategy such as diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index 5539eb8c8e49..891efbfcb2b1 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -15,7 +15,7 @@ import ( func New[D data.Point[D]]() Accumulator[D] { return Accumulator[D]{ - Map: make(exp.HashMap[D]), + Map: &exp.HashMap[D]{}, } } diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go index 04ffffbde5f5..65c7c003aa90 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go @@ -18,7 +18,7 @@ import ( func TestLimit(t *testing.T) { sum := random.Sum() - items := make(exp.HashMap[data.Number]) + items := &exp.HashMap[data.Number]{} lim := streams.Limit(items, 10) ids := make([]identity.Stream, 10) diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 8d5ba7533820..5ce8130336fc 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -249,6 +249,11 @@ func (p *Processor) exportMetrics() { if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil { p.logger.Error("Metrics export failed", zap.Error(err)) } + + // Clear everything now that we've exported + p.numbers.Clear() + p.histograms.Clear() + p.expHistograms.Clear() } func getOrCreateMetric( diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 5dc7d3cd4cc8..6d6eea35a0d0 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -15,8 +15,6 @@ import ( "go.opentelemetry.io/collector/processor/processortest" ) -// TODO: Add tests for the other data types. Ensuring things like: gauges are passed through unchanged, etc. - // TODO: Add tests for data expiration func TestAggregation(t *testing.T) { @@ -28,7 +26,6 @@ func TestAggregation(t *testing.T) { next []testMetric outputs []testMetric }{ - // TODO: Add many more test cases for all the edge cases { name: "BasicAggregation", inputs: []testMetric{ @@ -259,14 +256,25 @@ func TestAggregation(t *testing.T) { // Pretend we hit the interval timer and call export processor.exportMetrics() - // Next should have gotten two data sets: + // Processor should now be empty + require.Equal(t, 0, processor.numbers.Len()) + require.Equal(t, 0, processor.histograms.Len()) + require.Equal(t, 0, processor.expHistograms.Len()) + + // Exporting again should return nothing + processor.exportMetrics() + + // Next should have gotten three data sets: // 1. Anything left over from ConsumeMetrics() // 2. Anything exported from exportMetrics() + // 3. An empty entry for the second call to exportMetrics() allMetrics := next.AllMetrics() - require.Len(t, allMetrics, 2) + require.Len(t, allMetrics, 3) nextData := convertMetricsToTestData(t, allMetrics[0]) exportData := convertMetricsToTestData(t, allMetrics[1]) + secondExportData := convertMetricsToTestData(t, allMetrics[2]) + require.Empty(t, secondExportData) expectedNextData := testMetrics{ {