-
Notifications
You must be signed in to change notification settings - Fork 3
/
manager.go
212 lines (184 loc) · 4.39 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package perfm
import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)
// Monitor implemeneted perfMonitor
type Monitor struct {
c Config //configration for perfm
done chan struct{} //stop the perfm, used in duratoin worker
total int64 //total request, used in total worker
wg sync.WaitGroup //wait group to block the stop and sync the work thread
starter chan struct{}
collector *Collector //get the request cost from every done()
//job implement benchmark job
//error occoured in job.Do will be collected
job Job
}
// NewMonitor generate perfm
func NewMonitor(c Config) PerfMonitor {
return &Monitor{
c: c,
done: make(chan struct{}),
wg: sync.WaitGroup{},
starter: make(chan struct{}),
collector: NewCollector(&c),
total: 0,
}
}
// Reset regist a job into Monitor fro benchmark
// This operation will reset the PerfMonitor
func (p *Monitor) Reset(job Job) {
p.job = job
// reset monitor
p.done = make(chan struct{})
p.wg = sync.WaitGroup{}
}
// TODO merge into one
// totalworker
// durationworker
func (p *Monitor) totalWorker() {
// copy local job
job, err := p.job.Copy()
if err != nil {
fmt.Println("error in do copy", err)
return
}
// defer clean job
defer job.After()
var start time.Time
// done local wg
p.wg.Done()
var l int64
// wait for start
<-p.starter
for { // main work loop
select {
case <-p.done: // on close
p.wg.Done()
return
default:
// check if the request reach the goal
if l = atomic.AddInt64(&p.total, 1); l > p.c.Number {
if l == p.c.Number+1 { // double check, the last worker
// only one should do close
close(p.done)
}
// other goroutine exit now
// TODO XXX continue?
atomic.AddInt64(&p.total, -1)
p.wg.Done()
return
}
if err = job.Pre(); err != nil {
fmt.Println("error in do pre job", err)
p.wg.Done()
return
}
start = time.Now()
err = job.Do()
p.collector.Collect(time.Since(start))
if err != nil {
p.collector.ReportError(err)
}
}
}
}
func (p *Monitor) durationWorker() {
// copy local job
job, err := p.job.Copy()
if err != nil {
fmt.Println("error in do copy", err)
return
}
// defer clean job
defer job.After()
var start time.Time
// done local wg
p.wg.Done()
// wait for start
<-p.starter
for { // main work loop
select {
case <-p.done: // on close
p.wg.Done()
return
default:
atomic.AddInt64(&p.total, 1)
if err = job.Pre(); err != nil {
fmt.Println("error in do pre job", err)
p.wg.Done()
return
}
start = time.Now()
err = job.Do()
p.collector.Collect(time.Since(start))
if err != nil {
p.collector.ReportError(err)
}
}
}
}
func (p *Monitor) processSiginter() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
<-sigchan
close(p.done)
p.wg.Wait() // wait worker for exit
// send close to collector, wait for collection done
p.collector.WaitStop()
fmt.Println("===============SIGINT RECEIVED====================")
// then do print
p.collector.PrintResult(os.Stdout)
// wait job done and do summarize
p.wg.Wait()
os.Exit(0)
}
// Start the benchmark with given arguments on regisit
func (p *Monitor) Start(j Job) {
if j != nil {
p.job = j
}
if p.job == nil {
panic("error job does not registered yet")
}
go p.processSiginter()
// If job implement descripetion as Stringer
if _, ok := p.job.(fmt.Stringer); ok {
fmt.Println(p.job)
}
fmt.Println("==================JOB STARTED====================")
// Steps:
// 1. start all job, wait on wg
// 2. run all jobs, start benchmark
// wait all job goroutine created
p.wg.Add(p.c.Parallel)
for i := 0; i < p.c.Parallel; i++ {
if p.c.Number != 0 {
go p.totalWorker()
} else {
go p.durationWorker()
}
}
p.wg.Wait()
// now all goroutine created successfully, add wg again before start
// this is used for exit waitting
p.wg.Add(p.c.Parallel)
p.collector.Start() // mark the start time
close(p.starter) // send signal, start all jobs
if p.c.Number == 0 { // duration mode, sleep then stop
time.Sleep(time.Second * time.Duration(p.c.Duration))
close(p.done)
}
p.wg.Wait() // wait workers exit
// send close to collector, wait until collection done
p.collector.WaitStop()
fmt.Println("===================JOB DONE=======================")
p.collector.PrintResult(os.Stdout)
// wait job done and do summarize
p.wg.Wait()
}