-
Notifications
You must be signed in to change notification settings - Fork 5
/
controller.go
195 lines (169 loc) · 5.59 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package httprc
import (
"context"
"fmt"
"time"
)
type Controller interface {
// Add adds a new `http.Resource` to the controller. If the resource already exists,
// it will return an error.
Add(context.Context, Resource, ...AddOption) error
// Lookup a `httprc.Resource` by its URL. If the resource does not exist, it
// will return an error.
Lookup(context.Context, string) (Resource, error)
// Remove a `httprc.Resource` from the controller by its URL. If the resource does
// not exist, it will return an error.
Remove(context.Context, string) error
// Refresh forces a resource to be refreshed immediately. If the resource does
// not exist, or if the refresh fails, it will return an error.
Refresh(context.Context, string) error
ShutdownContext(context.Context) error
Shutdown(time.Duration) error
}
type controller struct {
cancel context.CancelFunc
check *time.Ticker
// incoming accepts new control requests from external sources
incoming chan any
// outgoing sends Syncer objects to the worker pool
outgoing chan Resource
traceSink TraceSink
syncoutgoing chan synchronousRequest
items map[string]Resource
tickInterval time.Duration
shutdown chan struct{}
wl Whitelist
defaultMaxInterval time.Duration
defaultMinInterval time.Duration
}
// Shutdown is a convenience function that calls ShutdownContext with a
// context that has a timeout of `timeout`.
func (c *controller) Shutdown(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.ShutdownContext(ctx)
}
// ShutdownContext stops the client and all associated goroutines, and waits for them
// to finish. If the context is canceled, the function will return immediately:
// there fore you should not use the context you used to start the client (because
// presumably it's already canceled).
//
// Waiting for the client shutdown will also ensure that all sinks are properly
// flushed.
func (c *controller) ShutdownContext(ctx context.Context) error {
c.cancel()
select {
case <-ctx.Done():
return ctx.Err()
case <-c.shutdown:
return nil
}
}
type ctrlRequest[T any] struct {
reply chan T
resource Resource
u string
}
type addRequest ctrlRequest[backendResponse[struct{}]]
type rmRequest ctrlRequest[backendResponse[struct{}]]
type refreshRequest ctrlRequest[backendResponse[struct{}]]
type lookupRequest ctrlRequest[backendResponse[Resource]]
type synchronousRequest ctrlRequest[backendResponse[struct{}]]
type adjustIntervalRequest struct {
resource Resource
}
type backendResponse[T any] struct {
payload T
err error
}
func sendBackend[TReq any, TB any](ctx context.Context, backendCh chan any, v TReq, replyCh chan backendResponse[TB]) (TB, error) {
select {
case <-ctx.Done():
case backendCh <- v:
}
select {
case <-ctx.Done():
var zero TB
return zero, ctx.Err()
case res := <-replyCh:
return res.payload, res.err
}
}
// Lookup returns a resource by its URL. If the resource does not exist, it
// will return an error.
//
// Unfortunately, due to the way typed parameters are handled in Go, we can only
// return a Resource object (and not a ResourceBase[T] object). This means that
// you will either need to use the `Resource.Get()` method or use a type
// assertion to obtain a `ResourceBase[T]` to get to the actual object you are
// looking for
func (c *controller) Lookup(ctx context.Context, u string) (Resource, error) {
reply := make(chan backendResponse[Resource], 1)
req := lookupRequest{
reply: reply,
u: u,
}
return sendBackend[lookupRequest, Resource](ctx, c.incoming, req, reply)
}
// Add adds a new resource to the controller. If the resource already
// exists, it will return an error.
//
// By default this function will automatically wait for the resource to be
// fetched once (by calling `r.Ready()`). Note that the `r.Ready()` call will NOT
// timeout unless you configure your context object with `context.WithTimeout`.
// To disable waiting, you can specify the `WithWaitReady(false)` option.
func (c *controller) Add(ctx context.Context, r Resource, options ...AddOption) error {
waitReady := true
//nolint:forcetypeassert
for _, option := range options {
switch option.Ident() {
case identWaitReady{}:
waitReady = option.(addOption).Value().(bool)
}
}
if !c.wl.IsAllowed(r.URL()) {
return fmt.Errorf(`httprc.Controller.AddResource: cannot add %q: %w`, r.URL(), errBlockedByWhitelist)
}
reply := make(chan backendResponse[struct{}], 1)
req := addRequest{
reply: reply,
resource: r,
}
if _, err := sendBackend[addRequest, struct{}](ctx, c.incoming, req, reply); err != nil {
return err
}
if waitReady {
if err := r.Ready(ctx); err != nil {
return err
}
}
return nil
}
// Remove removes a resource from the controller. If the resource does
// not exist, it will return an error.
func (c *controller) Remove(ctx context.Context, u string) error {
reply := make(chan backendResponse[struct{}], 1)
req := rmRequest{
reply: reply,
u: u,
}
if _, err := sendBackend[rmRequest, struct{}](ctx, c.incoming, req, reply); err != nil {
return err
}
return nil
}
// Refresh forces a resource to be refreshed immediately. If the resource does
// not exist, or if the refresh fails, it will return an error.
//
// This function is synchronous, and will block until the resource has been refreshed.
func (c *controller) Refresh(ctx context.Context, u string) error {
reply := make(chan backendResponse[struct{}], 1)
req := refreshRequest{
reply: reply,
u: u,
}
if _, err := sendBackend[refreshRequest, struct{}](ctx, c.incoming, req, reply); err != nil {
return err
}
return nil
}