-
Notifications
You must be signed in to change notification settings - Fork 3
/
service.go
223 lines (195 loc) · 5.53 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*Package core consists of a set of packages which are used in writing micro-service
applications.
Each package defines conventional ways of handling common tasks,
as well as a suite of tests to verify their behaviour.
*/
package core
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
var (
// tag and ref can be used by build stage with build flags to set things like git commit hash and tag.
// --ldflags "-X github.com/LUSHDigital/core.tag=${GIT_TAG}"
// --ldflags "-X github.com/LUSHDigital/core.ref=${GIT_COMMIT_HASH}"
tag string
ref string
)
// Service represents the minimal information required to define a working service.
type Service struct {
// Name represents the name of the service, typically the same as the github repository.
Name string `json:"name"`
// Type represents the type of the service, eg. service or aggregator.
Type string `json:"type"`
// Version represents the latest version or SVC tag of the service.
Version string `json:"version"`
// Revision represents the SVC revision or commit hash of the service.
Revision string `json:"revision"`
// GracePeriod represents the duration workers have to clean up before the process gets killed.
GracePeriod time.Duration `json:"grace_period"`
}
// NewService creates a new service based on
func NewService(name, kind string) *Service {
if v := os.Getenv("SERVICE_VERSION"); v != "" {
tag = v
}
if r := os.Getenv("SERVICE_REVISION"); r != "" {
ref = r
}
return &Service{
Name: name,
Type: kind,
Version: tag,
Revision: ref,
}
}
// Runner represents the behaviour for running a service worker.
type Runner interface {
// Run should run start processing the worker and be a blocking operation.
Run(context.Context) error
}
// Halter represents the behaviour for stopping a service worker.
type Halter interface {
// Halt should tell the worker to stop doing work.
Halt(context.Context) error
}
// Worker represents the behaviour for a service worker.
type Worker interface {
Runner
Halter
}
// StartWorkers will start the given service workers and block block indefinitely, until interupted.
// The process with an appropriate status code.
// DEPRECATED: Use MustRun in favour of StartWorkers.
func (s *Service) StartWorkers(ctx context.Context, workers ...Worker) {
log.Println("DEPRECATED: Use core.MustRun in favour of core.StartWorkers")
s.MustRun(ctx, workers...)
}
// MustRun will start the given service workers and block block indefinitely, until interupted.
// The process with an appropriate status code.
func (s *Service) MustRun(ctx context.Context, workers ...Worker) {
os.Exit(s.Run(ctx, workers...))
}
// Run will start the given service workers and block block indefinitely, until interupted.
func (s *Service) Run(ctx context.Context, workers ...Worker) int {
const fail int = 1
nWorkers := len(workers)
if nWorkers < 1 {
log.Println("need at least 1 service worker")
return fail
}
if err := s.validate(); err != nil {
log.Println(err)
return fail
}
var (
cancelled <-chan int
completed <-chan int
done, cancel func()
)
ctx, cancelled, cancel = ContextWithSignals(ctx)
completed, cancelled, done = WaitWithTimeout(nWorkers, cancelled, s.grace())
var run = func(ctx context.Context, worker Worker, done, cancel func()) {
if err := worker.Run(ctx); err != nil {
log.Println("service errored:", err)
go cancel()
}
done()
}
var halt = func(ctx context.Context, worker Worker) {
if err := worker.Halt(ctx); err != nil {
log.Println("service halted:", err)
}
}
log.Printf("starting %s: %s", s.Type, s.name())
for _, worker := range workers {
go run(ctx, worker, done, cancel)
}
for {
select {
case <-cancelled:
for _, worker := range workers {
go halt(ctx, worker)
}
case code := <-completed:
message := "shutdown gracefully..."
if code > 0 {
message = "failed to shutdown gracefully: killing!"
}
log.Println(message)
return code
}
}
}
func (s *Service) validate() error {
if s.Name == "" || s.Type == "" {
return fmt.Errorf("cannot start without a name or type")
}
return nil
}
func (s *Service) name() string {
var w strings.Builder
w.WriteString(s.Name)
if len(s.Revision) > 5 {
w.WriteString(" (" + s.Revision[0:6] + ")")
}
if s.Version != "" {
w.WriteString(" " + s.Version)
}
return w.String()
}
func (s *Service) grace() time.Duration {
grace := s.GracePeriod
if grace == 0 {
grace = time.Second * 5
}
return grace
}
// WaitWithTimeout will wait for a number of pieces of work has finished and send a message on the completed channel.
func WaitWithTimeout(delta int, cancelled <-chan int, timeout time.Duration) (<-chan int, <-chan int, func()) {
completedC := make(chan int, 1)
cancelledC := make(chan int, 1)
wg := &sync.WaitGroup{}
wg.Add(delta)
go func(wg *sync.WaitGroup) {
wg.Wait()
completedC <- 0
}(wg)
go func() {
select {
case code := <-cancelled:
cancelledC <- code
time.Sleep(timeout)
completedC <- code
}
}()
return completedC, cancelledC, wg.Done
}
// ContextWithSignals creates a new instance of signal context.
func ContextWithSignals(ctx context.Context) (context.Context, <-chan int, context.CancelFunc) {
var cancelCtx context.CancelFunc
ctx, cancelCtx = context.WithCancel(ctx)
sigs := make(chan os.Signal, 1)
cancelled := make(chan int, 1)
signal.Notify(sigs,
syscall.SIGINT,
syscall.SIGTERM,
)
var cancel = func() {
cancelCtx()
cancelled <- 1
}
go func() {
sig := <-sigs
log.Printf("received signal: %s", sig)
cancel()
}()
return ctx, cancelled, cancel
}