Skip to content

Commit

Permalink
Fix concurrency issues with memmetrics. (#105)
Browse files Browse the repository at this point in the history
* Fix concurrency issues with memmetrics.

* Add unit test for memmetrics race condition fix.

* Fix deadlock with Append in roundtrip.go

* Fix race, Inc() write so we need Lock

* Remove go 1.6 tests
  • Loading branch information
emilevauge authored Nov 23, 2017
1 parent da985a4 commit 5dfac99
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 8 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: go

go:
- 1.6.x
- 1.7.x
- 1.8.x
- 1.9.x
Expand Down
29 changes: 29 additions & 0 deletions memmetrics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ func NewHDRHistogram(low, high int64, sigfigs int) (h *HDRHistogram, err error)
}, nil
}

func (r *HDRHistogram) Export() *HDRHistogram {
var hist *hdrhistogram.Histogram = nil
if r.h != nil {
snapshot := r.h.Export()
hist = hdrhistogram.Import(snapshot)
}
return &HDRHistogram{low: r.low, high: r.high, sigfigs: r.sigfigs, h: hist}
}

// Returns latency at quantile with microsecond precision
func (h *HDRHistogram) LatencyAtQuantile(q float64) time.Duration {
return time.Duration(h.ValueAtQuantile(q)) * time.Microsecond
Expand Down Expand Up @@ -118,6 +127,26 @@ func NewRollingHDRHistogram(low, high int64, sigfigs int, period time.Duration,
return rh, nil
}

func (r *RollingHDRHistogram) Export() *RollingHDRHistogram {
export := &RollingHDRHistogram{}
export.idx = r.idx
export.lastRoll = r.lastRoll
export.period = r.period
export.bucketCount = r.bucketCount
export.low = r.low
export.high = r.high
export.sigfigs = r.sigfigs
export.clock = r.clock

exportBuckets := make([]*HDRHistogram, len(r.buckets))
for i, hist := range r.buckets {
exportBuckets[i] = hist.Export()
}
export.buckets = exportBuckets

return export
}

func (r *RollingHDRHistogram) Append(o *RollingHDRHistogram) error {
if r.bucketCount != o.bucketCount || r.period != o.period || r.low != o.low || r.high != o.high || r.sigfigs != o.sigfigs {
return fmt.Errorf("can't merge")
Expand Down
57 changes: 57 additions & 0 deletions memmetrics/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package memmetrics
import (
"time"

"github.com/codahale/hdrhistogram"
"github.com/mailgun/timetools"
. "gopkg.in/check.v1"
)
Expand Down Expand Up @@ -125,3 +126,59 @@ func (s *HistogramSuite) TestReset(c *C) {
c.Assert(m.ValueAtQuantile(100), Equals, int64(5))

}

func (s *HistogramSuite) TestHDRHistogramExportReturnsNewCopy(c *C) {
// Create HDRHistogram instance
a := HDRHistogram{}
a.low = 1
a.high = 2
a.sigfigs = 3
a.h = hdrhistogram.New(0, 1, 2)

// Get a copy and modify the original
b := a.Export()
a.low = 11
a.high = 12
a.sigfigs = 4
a.h = nil

// Assert the copy has not been modified
c.Assert(b.low, Equals, int64(1))
c.Assert(b.high, Equals, int64(2))
c.Assert(b.sigfigs, Equals, 3)
c.Assert(b.h, NotNil)
}

func (s *HistogramSuite) TestRollingHDRHistogramExportReturnsNewCopy(c *C) {
a := RollingHDRHistogram{}
a.idx = 1
origTime := time.Now()
a.lastRoll = origTime
a.period = 2 * time.Second
a.bucketCount = 3
a.low = 4
a.high = 5
a.sigfigs = 1
a.buckets = []*HDRHistogram{}
a.clock = s.tm

b := a.Export()
a.idx = 11
a.lastRoll = time.Now().Add(1 * time.Minute)
a.period = 12 * time.Second
a.bucketCount = 13
a.low = 14
a.high = 15
a.sigfigs = 1
a.buckets = nil
a.clock = nil

c.Assert(b.idx, Equals, 1)
c.Assert(b.lastRoll, Equals, origTime)
c.Assert(b.period, Equals, 2*time.Second)
c.Assert(b.bucketCount, Equals, 3)
c.Assert(b.low, Equals, int64(4))
c.Assert(b.high, Equals, int64(5))
c.Assert(b.buckets, NotNil)
c.Assert(b.clock, NotNil)
}
81 changes: 74 additions & 7 deletions memmetrics/roundtrip.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package memmetrics

import (
"errors"
"net/http"
"sync"
"time"

"github.com/mailgun/timetools"
Expand All @@ -13,10 +15,12 @@ import (
// are a rolling window histograms with defined precision as well.
// See RTOptions for more detail on parameters.
type RTMetrics struct {
total *RollingCounter
netErrors *RollingCounter
statusCodes map[int]*RollingCounter
histogram *RollingHDRHistogram
total *RollingCounter
netErrors *RollingCounter
statusCodes map[int]*RollingCounter
statusCodesLock sync.RWMutex
histogram *RollingHDRHistogram
histogramLock sync.RWMutex

newCounter NewCounterFn
newHist NewRollingHistogramFn
Expand Down Expand Up @@ -53,7 +57,8 @@ func RTClock(clock timetools.TimeProvider) rrOptSetter {
// NewRTMetrics returns new instance of metrics collector.
func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) {
m := &RTMetrics{
statusCodes: make(map[int]*RollingCounter),
statusCodes: make(map[int]*RollingCounter),
statusCodesLock: sync.RWMutex{},
}
for _, s := range settings {
if err := s(m); err != nil {
Expand Down Expand Up @@ -98,6 +103,33 @@ func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) {
return m, nil
}

// Returns a new RTMetrics which is a copy of the current one
func (m *RTMetrics) Export() *RTMetrics {
m.statusCodesLock.RLock()
defer m.statusCodesLock.RUnlock()
m.histogramLock.RLock()
defer m.histogramLock.RUnlock()

export := &RTMetrics{}
export.statusCodesLock = sync.RWMutex{}
export.histogramLock = sync.RWMutex{}
export.total = m.total.Clone()
export.netErrors = m.netErrors.Clone()
exportStatusCodes := map[int]*RollingCounter{}
for code, rollingCounter := range m.statusCodes {
exportStatusCodes[code] = rollingCounter.Clone()
}
export.statusCodes = exportStatusCodes
if m.histogram != nil {
export.histogram = m.histogram.Export()
}
export.newCounter = m.newCounter
export.newHist = m.newHist
export.clock = m.clock

return export
}

func (m *RTMetrics) CounterWindowSize() time.Duration {
return m.total.WindowSize()
}
Expand All @@ -115,6 +147,8 @@ func (m *RTMetrics) NetworkErrorRatio() float64 {
func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 {
a := int64(0)
b := int64(0)
m.statusCodesLock.RLock()
defer m.statusCodesLock.RUnlock()
for code, v := range m.statusCodes {
if code < endA && code >= startA {
a += v.Count()
Expand All @@ -130,6 +164,10 @@ func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 {
}

func (m *RTMetrics) Append(other *RTMetrics) error {
if m == other {
return errors.New("RTMetrics cannot append to self")
}

if err := m.total.Append(other.total); err != nil {
return err
}
Expand All @@ -138,7 +176,13 @@ func (m *RTMetrics) Append(other *RTMetrics) error {
return err
}

for code, c := range other.statusCodes {
copied := other.Export()

m.statusCodesLock.Lock()
defer m.statusCodesLock.Unlock()
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
for code, c := range copied.statusCodes {
o, ok := m.statusCodes[code]
if ok {
if err := o.Append(c); err != nil {
Expand All @@ -149,7 +193,7 @@ func (m *RTMetrics) Append(other *RTMetrics) error {
}
}

return m.histogram.Append(other.histogram)
return m.histogram.Append(copied.histogram)
}

func (m *RTMetrics) Record(code int, duration time.Duration) {
Expand All @@ -174,6 +218,8 @@ func (m *RTMetrics) NetworkErrorCount() int64 {
// GetStatusCodesCounts returns map with counts of the response codes
func (m *RTMetrics) StatusCodesCounts() map[int]int64 {
sc := make(map[int]int64)
m.statusCodesLock.RLock()
defer m.statusCodesLock.RUnlock()
for k, v := range m.statusCodes {
if v.Count() != 0 {
sc[k] = v.Count()
Expand All @@ -184,10 +230,16 @@ func (m *RTMetrics) StatusCodesCounts() map[int]int64 {

// GetLatencyHistogram computes and returns resulting histogram with latencies observed.
func (m *RTMetrics) LatencyHistogram() (*HDRHistogram, error) {
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
return m.histogram.Merged()
}

func (m *RTMetrics) Reset() {
m.statusCodesLock.Lock()
defer m.statusCodesLock.Unlock()
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
m.histogram.Reset()
m.total.Reset()
m.netErrors.Reset()
Expand All @@ -200,14 +252,29 @@ func (m *RTMetrics) recordNetError() error {
}

func (m *RTMetrics) recordLatency(d time.Duration) error {
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
return m.histogram.RecordLatencies(d, 1)
}

func (m *RTMetrics) recordStatusCode(statusCode int) error {
m.statusCodesLock.Lock()
if c, ok := m.statusCodes[statusCode]; ok {
c.Inc(1)
m.statusCodesLock.Unlock()
return nil
}
m.statusCodesLock.Unlock()

m.statusCodesLock.Lock()
defer m.statusCodesLock.Unlock()

// Check if another goroutine has written our counter already
if c, ok := m.statusCodes[statusCode]; ok {
c.Inc(1)
return nil
}

c, err := m.newCounter()
if err != nil {
return err
Expand Down
70 changes: 70 additions & 0 deletions memmetrics/roundtrip_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package memmetrics

import (
"runtime"
"sync"
"time"

"github.com/mailgun/timetools"
Expand Down Expand Up @@ -79,3 +81,71 @@ func (s *RRSuite) TestAppend(c *C) {
c.Assert(err, IsNil)
c.Assert(int(h.LatencyAtQuantile(100)/time.Second), Equals, 3)
}

func (s *RRSuite) TestConcurrentRecords(c *C) {
// This test asserts a race condition which requires parallelism
runtime.GOMAXPROCS(100)

rr, _ := NewRTMetrics(RTClock(s.tm))

for code := 0; code < 100; code++ {
for numRecords := 0; numRecords < 10; numRecords++ {
go func(statusCode int) {
rr.recordStatusCode(statusCode)
}(code)
}
}
}

func (s *RRSuite) TestRTMetricExportReturnsNewCopy(c *C) {
a := RTMetrics{}
a.clock = &timetools.RealTime{}
a.total, _ = NewCounter(1, time.Second, CounterClock(a.clock))
a.netErrors, _ = NewCounter(1, time.Second, CounterClock(a.clock))
a.statusCodes = map[int]*RollingCounter{}
a.statusCodesLock = sync.RWMutex{}
a.histogram = &RollingHDRHistogram{}
a.histogramLock = sync.RWMutex{}
a.newCounter = func() (*RollingCounter, error) {
return NewCounter(counterBuckets, counterResolution, CounterClock(a.clock))
}
a.newHist = func() (*RollingHDRHistogram, error) {
return NewRollingHDRHistogram(histMin, histMax, histSignificantFigures, histPeriod, histBuckets, RollingClock(a.clock))
}

b := a.Export()
a.total = nil
a.netErrors = nil
a.statusCodes = nil
a.histogram = nil
a.newCounter = nil
a.newHist = nil
a.clock = nil

c.Assert(b.total, NotNil)
c.Assert(b.netErrors, NotNil)
c.Assert(b.statusCodes, NotNil)
c.Assert(b.histogram, NotNil)
c.Assert(b.newCounter, NotNil)
c.Assert(b.newHist, NotNil)
c.Assert(b.clock, NotNil)

// a and b should have different locks
locksSucceed := make(chan bool)
go func() {
a.statusCodesLock.Lock()
b.statusCodesLock.Lock()
a.histogramLock.Lock()
b.histogramLock.Lock()
locksSucceed <- true
}()

for {
select {
case <-locksSucceed:
return
case <-time.After(10 * time.Second):
c.FailNow()
}
}
}

0 comments on commit 5dfac99

Please sign in to comment.