Skip to content

Commit

Permalink
re-design with timer reservoirs correctly independent per mertic
Browse files Browse the repository at this point in the history
re-design with the notion of sample rate appended to the metric
  • Loading branch information
maciuszek committed Jan 10, 2025
1 parent f35471d commit 57ef42b
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 50 deletions.
4 changes: 4 additions & 0 deletions logging_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge

func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) }

func (s *loggingSink) FlushTimerWithSampleRate(name string, value float64, _ float64) {
s.log(name, "timer", value)
}

func (s *loggingSink) Flush() { s.log("", "all stats", 0) }

// Logger
Expand Down
13 changes: 13 additions & 0 deletions mock/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ func (s *Sink) FlushTimer(name string, val float64) {
atomic.AddInt64(&p.count, 1)
}

// FlushTimer implements the stats.Sink.FlushTimer method and adds val to
// stat name.
func (s *Sink) FlushTimerWithSampleRate(name string, val float64, _ float64) {
timers := s.timers()
v, ok := timers.Load(name)
if !ok {
v, _ = timers.LoadOrStore(name, new(entry))
}
p := v.(*entry)
atomicAddFloat64(&p.val, val)
atomic.AddInt64(&p.count, 1)
}

// LoadCounter returns the value for stat name and if it was found.
func (s *Sink) LoadCounter(name string) (uint64, bool) {
v, ok := s.counters().Load(name)
Expand Down
20 changes: 14 additions & 6 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,23 @@ func (s *netSink) FlushGauge(name string, value uint64) {
}

func (s *netSink) FlushTimer(name string, value float64) {
// Since we mistakenly use floating point values to represent time
// durations this method is often passed an integer encoded as a
// float. Formatting integers is much faster (>2x) than formatting
s.optimizedFloatFlush(name, value, "|ms\n")
}

func (s *netSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) {
suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate)
s.optimizedFloatFlush(name, value, suffix)
}

func (s *netSink) optimizedFloatFlush(name string, value float64, suffix string) {
// Since we sometimes use floating point values (e.g. when representing time
// durations), data is often an integer encoded as a float.
// Formatting integers is much faster (>2x) than formatting
// floats so use integer formatting whenever possible.
//
if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value {
s.flushUint64(name, "|ms\n", uint64(value))
s.flushUint64(name, suffix, uint64(value))
} else {
s.flushFloat64(name, "|ms\n", value)
s.flushFloat64(name, suffix, value)
}
}

Expand Down
6 changes: 6 additions & 0 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (s *testStatSink) FlushTimer(name string, value float64) {
s.Unlock()
}

func (s *testStatSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) {
s.Lock()
s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate)
s.Unlock()
}

func TestCreateTimer(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)
Expand Down
2 changes: 2 additions & 0 deletions null_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive

func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive

func (s nullSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) {} //nolint:revive

func (s nullSink) Flush() {}
1 change: 1 addition & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Sink interface {
FlushCounter(name string, value uint64)
FlushGauge(name string, value uint64)
FlushTimer(name string, value float64)
FlushTimerWithSampleRate(name string, value float64, sampleRate float64)
}

// FlushableSink is an extension of Sink that provides a Flush() function that
Expand Down
93 changes: 61 additions & 32 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stats

import (
"context"
"math"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -304,7 +303,8 @@ type timer interface {
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
Value() float64
Values() []float64
SampleRate() float64
}

type standardTimer struct {
Expand All @@ -329,14 +329,22 @@ func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) Value() float64 {
return 0.0 // float zero value
func (t *standardTimer) Values() []float64 {
return nil
}

func (t *standardTimer) SampleRate() float64 {
return 0.0 // todo: using zero value of float64. the correct value would be 1.0 given 1 stat, hwoever that 1 stat is never stored, just flushed right away
}

type reservoirTimer struct {
base time.Duration
name string
value uint64
base time.Duration
name string
capacity int
values []float64
fill int // todo: the only purpose of this is to be faster than calculating len(values), is it worht it?
count int
mu sync.Mutex
}

func (t *reservoirTimer) time(dur time.Duration) {
Expand All @@ -348,16 +356,38 @@ func (t *reservoirTimer) AddDuration(dur time.Duration) {
}

func (t *reservoirTimer) AddValue(value float64) {
// todo does this need to be atomtic? ideally for the the use case it won't/shouldn't be changed like a counter/gauge would be
atomic.StoreUint64(&t.value, math.Float64bits(value))
t.mu.Lock()
defer t.mu.Unlock()

// todo: consider edge cases for <
if t.fill < t.capacity {
t.values = append(t.values, value)
} else {
// todo: discarding the oldest value when the reference is full, this can probably be smarter
t.values = append(t.values[1:], value)
t.fill--
}

t.fill++
t.count++
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) Value() float64 {
return math.Float64frombits(atomic.LoadUint64(&t.value))
func (t *reservoirTimer) Values() []float64 {
t.mu.Lock()
defer t.mu.Unlock()

// todo: Return a copy of the values slice to avoid data races
valuesCopy := make([]float64, len(t.values))
copy(valuesCopy, t.values)
return valuesCopy
}

func (t *reservoirTimer) SampleRate() float64 {
return float64(t.fill) / float64(t.count) // todo: is it faster to store these values as float64 instead of converting here
}

type timespan struct {
Expand All @@ -378,9 +408,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
type statStore struct {
counters sync.Map
gauges sync.Map
timers sync.Map

timerCount int
timers sync.Map // todo: should be control this count, especially for reservoirs we will be storing a lot of these in memory before flushing them

mu sync.RWMutex
statGenerators []StatGenerator
Expand Down Expand Up @@ -438,13 +466,13 @@ func (s *statStore) Flush() {
settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
s.timers.Range(func(key, v interface{}) bool {
// todo: maybe change this to not even add to the reservoir
// do not flush timers that are zero value
if value := v.(timer).Value(); value != 0.0 {
s.sink.FlushTimer(key.(string), v.(timer).Value())
timer := v.(timer)
sampleRate := timer.SampleRate()
for _, value := range timer.Values() {
s.sink.FlushTimerWithSampleRate(key.(string), value, sampleRate)
}

s.timers.Delete(key)
s.timerCount--
return true
})
}
Expand Down Expand Up @@ -554,27 +582,28 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
var t timer
settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
t = &reservoirTimer{name: serializedName, base: base}

// todo: > shouldn't be necessary
if s.timerCount >= settings.TimerReservoirSize {
// todo: this will delete 1 random timer in the map, this can probably be smarter
s.timers.Range(func(key, _ interface{}) bool {
s.timers.Delete(key)
s.timerCount--
return false
})
// todo: have defaults defined in a shared location
t = &reservoirTimer{
name: serializedName,
base: base,
capacity: 100,
values: make([]float64, 0, 100),
fill: 0,
count: 0,
}
} else {
t = &standardTimer{name: serializedName, sink: s.sink, base: base}
t = &standardTimer{
name: serializedName,
sink: s.sink,
base: base,
}
}

// todo: why would the timer ever be replaced, will this hurt reservoirs or benefit them? or is it just redundant since we load above?
if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(timer)
}

s.timerCount++

return t
}

Expand Down
21 changes: 9 additions & 12 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestTimerReservoir_Disabled(t *testing.T) {
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10))
store.NewTimer("test").AddValue(float64(i % 10))
}

if ts.String() != "" {
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestTimerReservoir(t *testing.T) {
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10 + 1)) // don't create timers with 0 values to make the count deterministic
store.NewTimer("test").AddValue(float64(i % 10))
}

if ts.String() != "" {
Expand All @@ -190,17 +190,19 @@ func TestTimerReservoir(t *testing.T) {
os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir_FilteredZeros(t *testing.T) {
func TestTimerReservoir_IndependantReservoirs(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
}

expectedStatCount := 1000

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10))
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir
}

if ts.String() != "" {
Expand All @@ -211,14 +213,9 @@ func TestTimerReservoir_FilteredZeros(t *testing.T) {

time.Sleep(1001 * time.Millisecond)

stats := strings.Split(ts.String(), "\n")
stats = stats[:len(stats)-1] // remove the extra new line character at the end of the buffer
for _, stat := range stats {
value := strings.Split(strings.Split(stat, ":")[1], ("|ms"))[0] // strip value and remove suffix and get raw number
if value == "0" {
t.Errorf("Got a zero value stat: %s", stat)
}

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
Expand Down

0 comments on commit 57ef42b

Please sign in to comment.