-
Notifications
You must be signed in to change notification settings - Fork 47
/
agent.go
387 lines (329 loc) · 9.98 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
// Copyright (c) 2018 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"errors"
"fmt"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"time"
"github.com/namsral/flag"
"go.ligato.io/cn-infra/v2/config"
"go.ligato.io/cn-infra/v2/infra"
"go.ligato.io/cn-infra/v2/logging"
"go.ligato.io/cn-infra/v2/logging/measure"
"go.ligato.io/cn-infra/v2/utils/once"
)
var agentLogger = logging.DefaultRegistry.NewLogger("agent")
// Variables set by the compiler using ldflags
var (
// BuildVersion describes version for the build. It is usually set using `git describe --always --tags --dirty`.
BuildVersion = "v0.0.0-dev"
// BuildDate describes time of the build.
BuildDate string
// CommitHash describes commit hash for the build.
CommitHash string
)
// Agent implements startup & shutdown procedures for plugins.
type Agent interface {
// Run is a blocking call which starts the agent with all of its plugins,
// waits for a signal from OS (SIGINT, SIGTERM by default), context cancellation or
// close of quit channel (can be set via options) and then stops the agent.
// Returns nil if all the plugins were intialized and closed successfully.
Run() error
// Start starts the agent with all the plugins, calling their Init() and optionally AfterInit().
// Returns nil if all the plugins were initialized successfully.
Start() error
// Stop stops the agent with all the plugins, calling their Close().
// Returns nil if all the plugins were closed successfully.
Stop() error
// Options returns all agent's options configured via constructor.
Options() Options
// Wait waits until agent is stopped and returns same error as Stop().
Wait() error
// After returns a channel that is closed before the agents is stopped.
// Note: It is not certain the all plugins are stopped, see Error()..
After() <-chan struct{}
// Error returns an error that occurret when the agent was stopped.
// Note: This essentially just calls Stop()..
Error() error
}
// NewAgent creates a new agent using given options and registers all flags
// defined for plugins via config.ForPlugin.
func NewAgent(opts ...Option) Agent {
options := newOptions(opts...)
if !flag.Parsed() {
config.DefineDirFlag()
for _, p := range options.Plugins {
name := p.String()
infraLogger.Debugf("registering flags for: %q", name)
config.DefineFlagsFor(name)
}
flag.Parse()
}
return &agent{
opts: options,
tracer: measure.NewTracer("agent-plugins"),
}
}
type agent struct {
opts Options
stopCh chan struct{}
startOnce once.ReturnError
stopOnce once.ReturnError
tracer measure.Tracer
mu sync.Mutex
curPlugin infra.Plugin
}
// Options returns the Options the agent was created with
func (a *agent) Options() Options {
return a.opts
}
// Start starts the agent. Start will return as soon as the Agent is ready. The Agent continues
// running after Start returns.
func (a *agent) Start() error {
return a.startOnce.Do(a.starter)
}
// Stop the Agent. Calls close on all Plugins
func (a *agent) Stop() error {
return a.stopOnce.Do(a.stopper)
}
// Run runs the agent. Run will not return until a SIGINT, SIGTERM, or SIGKILL is received
func (a *agent) Run() error {
if err := a.Start(); err != nil {
return err
}
return a.Wait()
}
func (a *agent) starter() error {
agentLogger.WithFields(logging.Fields{
"CommitHash": CommitHash,
"BuildDate": BuildDate,
}).Infof("Starting agent version: %v", BuildVersion)
// If we want to properly handle cleanup when a SIG comes in *during*
// agent startup (ie, clean up after its finished) we need to register
// for the signal before we start() the agent
sig := make(chan os.Signal, 1)
if len(a.opts.QuitSignals) > 0 {
signal.Notify(sig, a.opts.QuitSignals...)
}
started := make(chan struct{})
if timeout := a.opts.StartTimeout; timeout > 0 {
go func() {
select {
case s := <-sig:
agentLogger.Infof("Signal %v received during agent start, stopping", s)
os.Exit(1)
case <-started:
// agent started
case <-time.After(timeout):
a.mu.Lock()
curPlugin := a.curPlugin
a.mu.Unlock()
agentLogger.Errorf("Agent failed to start before timeout (%v) last plugin: %s", timeout, curPlugin)
dumpStacktrace()
os.Exit(1)
}
}()
}
// If the agent started, we have things to clean up if here is a SIG
// So fire off a goroutine to do that
t := time.Now()
if err := a.start(); err != nil {
signal.Stop(sig)
return err
}
close(started)
agentLogger.Infof("Agent started with %d plugins (took %v)",
len(a.opts.Plugins), time.Since(t).Round(time.Millisecond))
a.stopCh = make(chan struct{}) // If we are started, we have a stopCh to signal stopping
go func() {
var quit <-chan struct{}
if a.opts.Context != nil {
quit = a.opts.Context.Done()
}
// Wait for signal or agent stop
select {
case <-a.opts.QuitChan:
agentLogger.Info("Quit channel closed, stopping.")
case <-quit:
agentLogger.Info("Context canceled, stopping.")
case s := <-sig:
agentLogger.Infof("Signal %v received, stopping.", s)
case <-a.stopCh:
// agent stopped
}
// Doesn't hurt to call Stop twice, its idempotent because of the
// stopOnce
a.Stop()
signal.Stop(sig)
}()
return nil
}
func (a *agent) start() error {
agentLogger.Debugf("starting %d plugins", len(a.opts.Plugins))
// Init plugins
for _, plugin := range a.opts.Plugins {
t := time.Now()
a.mu.Lock()
a.curPlugin = plugin
a.mu.Unlock()
agentLogger.Debugf("-> Init(): %v", plugin)
if err := plugin.Init(); err != nil {
return err
}
a.tracer.LogTime(fmt.Sprintf("%v.Init", plugin), t)
}
// AfterInit plugins
for _, plugin := range a.opts.Plugins {
t := time.Now()
a.mu.Lock()
a.curPlugin = plugin
a.mu.Unlock()
if postPlugin, ok := plugin.(infra.PostInit); ok {
agentLogger.Debugf("-> AfterInit(): %v", plugin)
if err := postPlugin.AfterInit(); err != nil {
return err
}
} else {
agentLogger.Debugf("-- AfterInit(): %v (not used)", plugin)
}
a.tracer.LogTime(fmt.Sprintf("%v.AfterInit", plugin), t)
}
a.mu.Lock()
a.curPlugin = nil
a.mu.Unlock()
if printPluginStartDurations && infraLogger.GetLevel() >= logging.DebugLevel {
var b strings.Builder
b.WriteString("plugin start durations:\n")
for _, entry := range a.tracer.Get().GetTracedEntries() {
dur := "<1ms"
if d := time.Duration(entry.Duration); d > time.Millisecond {
dur = d.Round(time.Millisecond).String()
}
b.WriteString(fmt.Sprintf(" - %v: %v\n", entry.MsgName, dur))
}
fmt.Fprintf(os.Stdout, b.String())
}
return nil
}
func (a *agent) stopper() error {
agentLogger.Infof("Stopping agent")
stopped := make(chan struct{})
defer close(stopped)
if timeout := a.opts.StopTimeout; timeout > 0 {
go func() {
select {
case <-stopped:
// agent stopped
case <-time.After(timeout):
agentLogger.Errorf("agent failed to stop before timeout (%v)", timeout)
dumpStacktrace()
os.Exit(1)
}
}()
}
if err := a.stop(); err != nil {
return err
}
agentLogger.Info("Agent stopped")
return nil
}
func (a *agent) stop() error {
if a.stopCh == nil {
err := errors.New("attempted to stop an agent that was not Started")
agentLogger.Error(err)
return err
}
agentLogger.Debugf("stopping %d plugins", len(a.opts.Plugins))
defer close(a.stopCh)
// Close plugins in reverse order
for i := len(a.opts.Plugins) - 1; i >= 0; i-- {
p := a.opts.Plugins[i]
agentLogger.Debugf("-> Close(): %v", p)
if err := p.Close(); err != nil {
return err
}
}
return nil
}
// Wait will not return until a SIGINT, SIGTERM, or SIGKILL is received
// Or the Agent is Stopped
// All Plugins are Closed() before Wait returns
func (a *agent) Wait() error {
if a.stopCh == nil {
err := errors.New("attempted to wait on an agent that wasn't Started")
agentLogger.Error(err)
return err
}
<-a.stopCh
// If we get here, a.Stop() has already been called, and we are simply
// retrieving the error if any squirreled away by stopOnce
return a.Stop()
}
// After returns a channel that will be closed when the agent is Stopped.
// To retrieve any error from the agent stopping call Error() on the agent
// The normal pattern of use is:
//
// agent := NewAgent(options...)
// agent.Start()
// select {
// case <-agent.After() // Will wait till the agent is stopped
// ...
// }
// err := agent.Error() // Will return any error from the agent being stopped
//
func (a *agent) After() <-chan struct{} {
if a.stopCh != nil {
return a.stopCh
}
// The agent didn't start, so we can't return a.stopCh
// because *only* a.start() should allocate that
// we won't return a nil channel, because nil channels
// block forever.
// Since the normal pattern is to call a.After() so you
// can select till the agent is done and a.Stop() to
// retrieve the error, returning a closed channel will preserve that
// usage, as a.Stop() returns an error complaining that the agent
// never started.
ch := make(chan struct{})
close(ch)
return ch
}
// Error returns any error that occurred when the agent was Stopped
func (a *agent) Error() error {
// a.Stop() returns whatever error occurred when stopping the agent
// This is because of stopOnce
// If you try to retrieve an error before the agent is started, you will get
// an error complaining the agent isn't started.
return a.Stop()
}
func dumpStacktrace() {
if !DumpStackTraceOnTimeout {
return
}
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
buf = buf[:n]
break
}
buf = make([]byte, 2*len(buf))
}
os.Stderr.Write(buf)
}