Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memmetrics: simplify locking and solve data race #209

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 37 additions & 40 deletions memmetrics/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ import (
// are a rolling window histograms with defined precision as well.
// See RTOptions for more detail on parameters.
type RTMetrics struct {
total *RollingCounter
netErrors *RollingCounter
statusCodes map[int]*RollingCounter
statusCodesLock sync.RWMutex
histogram *RollingHDRHistogram
histogramLock sync.RWMutex

newCounter NewCounterFn
newHist NewRollingHistogramFn
clock timetools.TimeProvider
// lock protects all data members.
lock sync.Mutex
total *RollingCounter
netErrors *RollingCounter
statusCodes map[int]*RollingCounter
histogram *RollingHDRHistogram
newCounter NewCounterFn
newHist NewRollingHistogramFn
clock timetools.TimeProvider
}

type rrOptSetter func(r *RTMetrics) error
Expand Down Expand Up @@ -65,8 +64,7 @@ func RTClock(clock timetools.TimeProvider) rrOptSetter {
// NewRTMetrics returns new instance of metrics collector.
func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) {
m := &RTMetrics{
statusCodes: make(map[int]*RollingCounter),
statusCodesLock: sync.RWMutex{},
statusCodes: make(map[int]*RollingCounter),
}
for _, s := range settings {
if err := s(m); err != nil {
Expand Down Expand Up @@ -113,14 +111,10 @@ func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) {

// Export Returns a new RTMetrics which is a copy of the current one
func (m *RTMetrics) Export() *RTMetrics {
m.statusCodesLock.RLock()
defer m.statusCodesLock.RUnlock()
m.histogramLock.RLock()
defer m.histogramLock.RUnlock()
m.lock.Lock()
defer m.lock.Unlock()

export := &RTMetrics{}
export.statusCodesLock = sync.RWMutex{}
export.histogramLock = sync.RWMutex{}
export.total = m.total.Clone()
export.netErrors = m.netErrors.Clone()
exportStatusCodes := map[int]*RollingCounter{}
Expand All @@ -140,12 +134,16 @@ func (m *RTMetrics) Export() *RTMetrics {

// CounterWindowSize gets total windows size
func (m *RTMetrics) CounterWindowSize() time.Duration {
m.lock.Lock()
defer m.lock.Unlock()
return m.total.WindowSize()
}

// NetworkErrorRatio calculates the amont of network errors such as time outs and dropped connection
// that occurred in the given time window compared to the total requests count.
func (m *RTMetrics) NetworkErrorRatio() float64 {
m.lock.Lock()
defer m.lock.Unlock()
if m.total.Count() == 0 {
return 0
}
Expand All @@ -154,10 +152,10 @@ func (m *RTMetrics) NetworkErrorRatio() float64 {

// ResponseCodeRatio calculates ratio of count(startA to endA) / count(startB to endB)
func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 {
m.lock.Lock()
defer m.lock.Unlock()
a := int64(0)
b := int64(0)
m.statusCodesLock.RLock()
defer m.statusCodesLock.RUnlock()
for code, v := range m.statusCodes {
if code < endA && code >= startA {
a += v.Count()
Expand All @@ -174,6 +172,9 @@ func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 {

// Append append a metric
func (m *RTMetrics) Append(other *RTMetrics) error {
m.lock.Lock()
defer m.lock.Unlock()

if m == other {
return errors.New("RTMetrics cannot append to self")
}
Expand All @@ -188,10 +189,6 @@ func (m *RTMetrics) Append(other *RTMetrics) error {

copied := other.Export()

m.statusCodesLock.Lock()
defer m.statusCodesLock.Unlock()
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
for code, c := range copied.statusCodes {
o, ok := m.statusCodes[code]
if ok {
Expand All @@ -208,6 +205,9 @@ func (m *RTMetrics) Append(other *RTMetrics) error {

// Record records a metric
func (m *RTMetrics) Record(code int, duration time.Duration) {
m.lock.Lock()
defer m.lock.Unlock()

m.total.Inc(1)
if code == http.StatusGatewayTimeout || code == http.StatusBadGateway {
m.netErrors.Inc(1)
Expand All @@ -218,19 +218,24 @@ func (m *RTMetrics) Record(code int, duration time.Duration) {

// TotalCount returns total count of processed requests collected.
func (m *RTMetrics) TotalCount() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.total.Count()
}

// NetworkErrorCount returns total count of processed requests observed
func (m *RTMetrics) NetworkErrorCount() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.netErrors.Count()
}

// StatusCodesCounts returns map with counts of the response codes
func (m *RTMetrics) StatusCodesCounts() map[int]int64 {
m.lock.Lock()
defer m.lock.Unlock()

sc := make(map[int]int64)
m.statusCodesLock.RLock()
defer m.statusCodesLock.RUnlock()
for k, v := range m.statusCodes {
if v.Count() != 0 {
sc[k] = v.Count()
Expand All @@ -241,40 +246,32 @@ func (m *RTMetrics) StatusCodesCounts() map[int]int64 {

// LatencyHistogram computes and returns resulting histogram with latencies observed.
func (m *RTMetrics) LatencyHistogram() (*HDRHistogram, error) {
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
m.lock.Lock()
defer m.lock.Unlock()
return m.histogram.Merged()
}

// Reset reset metrics
func (m *RTMetrics) Reset() {
m.statusCodesLock.Lock()
defer m.statusCodesLock.Unlock()
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
m.lock.Lock()
defer m.lock.Unlock()
m.histogram.Reset()
m.total.Reset()
m.netErrors.Reset()
m.statusCodes = make(map[int]*RollingCounter)
}

// WARNING: Lock must be held before calling.
func (m *RTMetrics) recordLatency(d time.Duration) error {
m.histogramLock.Lock()
defer m.histogramLock.Unlock()
return m.histogram.RecordLatencies(d, 1)
}

// WARNING: Lock must be held before calling.
func (m *RTMetrics) recordStatusCode(statusCode int) error {
m.statusCodesLock.Lock()
if c, ok := m.statusCodes[statusCode]; ok {
c.Inc(1)
m.statusCodesLock.Unlock()
return nil
}
m.statusCodesLock.Unlock()

m.statusCodesLock.Lock()
defer m.statusCodesLock.Unlock()

// Check if another goroutine has written our counter already
if c, ok := m.statusCodes[statusCode]; ok {
Expand Down
89 changes: 63 additions & 26 deletions memmetrics/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,61 @@ import (
"github.com/vulcand/oxy/testutils"
)

func BenchmarkRecord(b *testing.B) {
b.ReportAllocs()

rr, err := NewRTMetrics(RTClock(testutils.GetClock()))
require.NoError(b, err)

// warm up metrics. Adding a new code can do allocations, but in the steady
// state recording a code is cheap. We want to measure the steady state.
const codes = 100
for code := 0; code < codes; code++ {
rr.Record(code, time.Second)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
rr.Record(i%codes, time.Second)
}
}

func BenchmarkRecordConcurrently(b *testing.B) {
b.ReportAllocs()

rr, err := NewRTMetrics(RTClock(testutils.GetClock()))
require.NoError(b, err)

// warm up metrics. Adding a new code can do allocations, but in the steady
// state recording a code is cheap. We want to measure the steady state.
const codes = 100
for code := 0; code < codes; code++ {
rr.Record(code, time.Second)
}

concurrency := runtime.NumCPU()
b.Logf("NumCPU: %d, Concurrency: %d, GOMAXPROCS: %d",
runtime.NumCPU(), concurrency, runtime.GOMAXPROCS(0))
wg := sync.WaitGroup{}
wg.Add(concurrency)
perG := b.N/concurrency
if perG == 0 {
perG = 1
}

b.ResetTimer()
for i := 0; i < concurrency; i++ {
go func() {
for j := 0; j < perG; j++ {
rr.Record(j%codes, time.Second)
}
wg.Done()
}()
}

wg.Wait()
}

func TestDefaults(t *testing.T) {
rr, err := NewRTMetrics(RTClock(testutils.GetClock()))
require.NoError(t, err)
Expand Down Expand Up @@ -75,28 +130,29 @@ func TestAppend(t *testing.T) {
}

func TestConcurrentRecords(t *testing.T) {
// This test asserts a race condition which requires parallelism
// This test asserts a race condition which requires concurrency. Set
// GOMAXPROCS high for this test, then restore after test completes.
n := runtime.GOMAXPROCS(0)
runtime.GOMAXPROCS(100)
defer runtime.GOMAXPROCS(n)

rr, err := NewRTMetrics(RTClock(testutils.GetClock()))
require.NoError(t, err)

for code := 0; code < 100; code++ {
for numRecords := 0; numRecords < 10; numRecords++ {
go func(statusCode int) {
_ = rr.recordStatusCode(statusCode)
rr.Record(statusCode, time.Second)
}(code)
}
}
}

func TestRTMetricExportReturnsNewCopy(t *testing.T) {
a := RTMetrics{
clock: &timetools.RealTime{},
statusCodes: map[int]*RollingCounter{},
statusCodesLock: sync.RWMutex{},
histogram: &RollingHDRHistogram{},
histogramLock: sync.RWMutex{},
clock: &timetools.RealTime{},
statusCodes: map[int]*RollingCounter{},
histogram: &RollingHDRHistogram{},
}

var err error
Expand Down Expand Up @@ -129,23 +185,4 @@ func TestRTMetricExportReturnsNewCopy(t *testing.T) {
assert.NotNil(t, b.newCounter)
assert.NotNil(t, b.newHist)
assert.NotNil(t, b.clock)

// a and b should have different locks
locksSucceed := make(chan bool)
go func() {
a.statusCodesLock.Lock()
b.statusCodesLock.Lock()
a.histogramLock.Lock()
b.histogramLock.Lock()
locksSucceed <- true
}()

for {
select {
case <-locksSucceed:
return
case <-time.After(10 * time.Second):
t.FailNow()
}
}
}