-
Notifications
You must be signed in to change notification settings - Fork 3
/
perfm.go
297 lines (258 loc) · 7.72 KB
/
perfm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package perfm
import (
"fmt"
"io"
"math"
"os"
"sync"
"sync/atomic"
"time"
hist "github.com/arthurkiller/perfm/histogram"
)
// Job give out a job for parallel call
// 1. start workers
// 1. workers call job.Copy()
// 2. for-loop do
// * job.Pre()
// * job.Do()
// 3. after for-loop call job.After()
// 2. caculate the summary
type Job interface {
// Copy will copy a job for parallel call
Copy() (Job, error)
// Pre will called before do
Pre() error
// Do contains the core job here
Do() error
// After contains the clean job after job done
After()
}
//PerfMonitor define the atcion about perfmonitor
type PerfMonitor interface {
Reset(Job) //regist the job to perfm
Start(Job) //start the perf monitor
}
// New perfmonitor
func New(options ...Options) PerfMonitor {
config := NewConfig(options...)
return NewMonitor(config)
}
// BUFFERLEN set for duration channel length
const BUFFERLEN = 0x7FFFF
// ERRORBUFFERLEN set for error collecting channel length
const ERRORBUFFERLEN = 1
// Collector collect all perfm config and do the statistic
type Collector struct {
Sum float64 //Sum of the per request cost
Stdev float64 //Standard Deviation
Mean float64 //Mean about distribution
Total int64 //total request by count
errCount int64 //error counter count error request
startTime time.Time
runningDuration time.Duration
maxQPS int64
minQPS int64
conf *Config
wg sync.WaitGroup
durationCache chan int64 // duration cache buffer, wait for operation
histogram *hist.NumericHistogram // used to print the histogram
done chan struct{} // close channel
errChan chan error
localtimer <-chan time.Time // print timer
localCount int64 // count for the number in the sampling times
localTimeCount int64 // count for the sampling time total costs
}
//Config define the Config about perfm
type Config struct {
// manager config
Duration int `json:"duration"` // benchmark duration in second
Number int64 `json:"number"` // total requests
Parallel int `json:"parallel"` // parallel worker numbers
NoPrint bool `json:"no_print"` // disable statistic print
Frequency int `json:"frequency"` // sampling frequency, control the precision
// collector config
BinsNumber int `json:"bins_number"` // set the histogram bins number
}
//NewConfig gen the config
func NewConfig(options ...Options) Config {
c := Config{
Duration: 10,
Number: 0,
Parallel: 4,
NoPrint: false,
Frequency: 1,
BinsNumber: 15,
}
for _, o := range options {
o(&c)
}
return c
}
//Options define the options of congif
type Options func(*Config)
//WithParallel set the workers
func WithParallel(i int) Options {
return func(o *Config) {
o.Parallel = i
}
}
//WithDuration set the test running duration
func WithDuration(i int) Options {
return func(o *Config) {
o.Duration = i
}
}
//WithNumber set the total benchmark request
func WithNumber(i int64) Options {
return func(o *Config) {
o.Number = i
}
}
//WithFrequency set the frequency
func WithFrequency(i int) Options {
return func(o *Config) {
o.Frequency = i
}
}
//WithBinsNumber set the bins number of config
func WithBinsNumber(i int) Options {
return func(o *Config) {
o.BinsNumber = i
}
}
//WithNoPrint will disable output during benchmarking
func WithNoPrint() Options {
return func(o *Config) {
o.NoPrint = true
}
}
// NewCollector create collector
// 1. create collector
// 2. run the goroutine monitor for duration
// 3. do the collection
func NewCollector(c *Config) *Collector {
cc := &Collector{
wg: sync.WaitGroup{},
durationCache: make(chan int64, BUFFERLEN),
errChan: make(chan error, ERRORBUFFERLEN),
localtimer: time.Tick(time.Second * time.Duration(c.Frequency)),
histogram: hist.NewHistogram(c.BinsNumber),
done: make(chan struct{}),
conf: c,
errCount: 0,
Sum: 0,
Stdev: 0,
Mean: 0,
Total: 0,
maxQPS: 0,
minQPS: math.MaxInt64,
localCount: 0,
localTimeCount: 0,
}
// then do the starting, start the goroutine
cc.wg.Add(1) // add wg, and wait for goroutine start
go cc.run()
cc.wg.Wait()
cc.wg.Add(1) // add wg and wait goroutine successfully stopped
return cc
}
// Start mark the start time
func (c *Collector) Start() {
c.startTime = time.Now()
}
func (c *Collector) run() {
var cost int64
var ok bool
c.wg.Done() // generate new collector goroutine, makesure it has started
for {
select {
case <-c.localtimer: // print timer per second
if c.localCount == 0 {
continue
}
if !c.conf.NoPrint {
// update local qps counting
if c.localCount > c.maxQPS {
c.maxQPS = c.localCount
} else if c.localCount < c.minQPS {
c.minQPS = c.localCount
}
fmt.Println(c) // print statistic line
select {
case err := <-c.errChan:
fmt.Fprintf(os.Stderr, "[ERR]: %v\n", err)
default:
}
}
c.localCount = 0
c.localTimeCount = 0
case cost, ok = <-c.durationCache: // on collection, main operation
if !ok {
continue
}
c.Total++
c.localCount++
c.localTimeCount += cost
c.histogram.Add(cost)
case <-c.done: // close notify channel
for cost := range c.durationCache {
c.Total++
c.localCount++
c.localTimeCount += cost
c.histogram.Add(cost)
}
if !c.conf.NoPrint {
fmt.Println(c)
}
c.wg.Done() // signal wg done on exiting
return
}
}
}
func (c *Collector) String() string {
return fmt.Sprintf("%s\tCurrent Qps: %-6.d\tAverage Qps: %-6.d\tCumulate: %-8.d\tCurrent Latency: %-7.3fms", time.Now().Format("15:04:05.000"),
c.localCount, c.Total*1000/time.Since(c.startTime).Milliseconds(), c.Total, float64(c.localTimeCount/c.localCount)/1000000)
}
// WaitStop will consume all
func (c *Collector) WaitStop() {
c.runningDuration = time.Since(c.startTime)
close(c.durationCache) // close channel
close(c.done)
c.wg.Wait()
}
// PrintResult print histogram chart to io writer
func (c *Collector) PrintResult(out io.Writer) {
fmt.Fprintf(out, "\n==================SUMMARIZE=======================\n")
// print error info
if c.errCount != 0 {
fmt.Printf("Total errors: %v\t Error percentage: %.3f%%\n", c.errCount,
float64(c.errCount*100)/float64(c.Total))
}
// print histogram chart
fmt.Fprintf(out, "%s\n", c.histogram)
fmt.Fprintf(out, "Latency Max: %.3fms Min: %.3fms Mean: %.3fms STDEV: %.3fms CV: %.3f%% Variance:%.3f ms2\n",
float64(c.histogram.Max())/1000000, float64(c.histogram.Min())/1000000, c.histogram.Mean()/1000000,
c.histogram.STDEV()/1000000, c.histogram.CV(), c.histogram.Variance()/1000000)
if c.runningDuration.Milliseconds() > 0 {
fmt.Fprintf(out, "Qps Max: %d Min: %d Mean: %d\n", c.maxQPS, c.minQPS, (c.Total-c.localCount)*1000/c.runningDuration.Milliseconds())
}
fmt.Fprintf(out, "Quantile:\n50%% in:\t%.3fms\n60%% in:\t%.3fms\n70%% in:\t%.3fms\n80%% in:\t%.3fms\n90%% in:\t%.3fms\n95%% in:\t%.3fms\n99%% in:\t%.3fms\n",
float64(c.histogram.Quantile(0.5))/1000000, float64(c.histogram.Quantile(0.6))/1000000,
float64(c.histogram.Quantile(0.7))/1000000, float64(c.histogram.Quantile(0.8))/1000000,
float64(c.histogram.Quantile(0.9))/1000000, float64(c.histogram.Quantile(0.95))/1000000,
float64(c.histogram.Quantile(0.99))/1000000)
fmt.Fprintf(out, "===============================================\n")
}
// Collect a time duration and add to histogram
func (c *Collector) Collect(d time.Duration) {
c.durationCache <- int64(d)
}
// ReportError try to put error into collector chan for print
// parallel safe
func (c *Collector) ReportError(e error) {
atomic.AddInt64(&c.errCount, 1)
select {
case c.errChan <- e:
default:
}
}