-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotifier.go
295 lines (258 loc) · 7.47 KB
/
notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// Package gorsn implements utility routines for periodically monitoring a folder and
// its sub-folders content to detect any change such as file or folder deletion and
// creation along with content or permissions modification. Then it emits an appropriate
// event object on a consumable channel. The notifier system it provides could be stopped
// and status checked once started. It finally accepts a set of options which are safe to
// be modified by multiple goroutines even during its operations.
package gorsn
import (
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
)
const (
DEFAULT_QUEUE_SIZE = 10
DEFAULT_MAX_WORKERS = 1
DEFAULT_SCAN_INTERVAL = 1 * time.Second
)
// ScanNotifier is an interface which defines a set of available actions.
type ScanNotifier interface {
// Queue returns the channel to listen on for receiving changes events.
// The returned channel is read-only to avoid closing or writing on.
Queue() <-chan Event
// Start begins periodic scanning of root directory and emitting events.
Start(context.Context) error
// Stop aborts the scanning of root directory and sending events.
Stop() error
// IsRunning reports whether the scan notifier has started.
IsRunning() bool
// Flush clears internal cache history of files and directories under monitoring.
// Once succeeded, `CREATE` is the next event for each item under monitoring.
// This could be used directly after initialization of the scan notifier instance
// in order to receive the list of item (via `CREATE` event) inside root directory.
// Calling this while the scan notifier has started will make the scanner to detect
// each item like newly created into the root directory so the notifier will emit
// `CREATE` event for those items almost immediately.
Flush()
// Pause instructs the scanner to escape at each polling interval so no changes
// detection will happen then no new events will be sent.
// Use Resume() to restart the normal scanning and event notification processes.
Pause() error
// Resume restarts the scanner and notifier after being into `paused` state.
Resume() error
}
type pathInfos struct {
modTime time.Time
mode fs.FileMode
visited bool
}
type fsEntry struct {
path string
d fs.DirEntry
err error
}
type snotifier struct {
root string
opts *Options
paths sync.Map
queue chan Event
iqueue chan *fsEntry
stop chan struct{}
ready bool
wg *sync.WaitGroup
running atomic.Bool
stopping atomic.Bool
paused atomic.Bool
}
// Queue returns a read only channel of events.
func (sn *snotifier) Queue() <-chan Event {
return sn.queue
}
// Stop triggers the notifier routines to exit
// and to close the events queue.
func (sn *snotifier) Stop() error {
if sn.isStopping() {
return ErrScanIsStopping
}
if !sn.IsRunning() {
return ErrScanIsNotRunning
}
sn.stop <- struct{}{}
return nil
}
// IsRunning tells wether the scan notifier is still monitoring
// for new events or was stopped or was not started yet.
func (sn *snotifier) IsRunning() bool {
return sn.running.Load()
}
// isStopping tells wether the scan notifier is in the process of closing.
func (sn *snotifier) isStopping() bool {
return sn.stopping.Load()
}
// Flush remove all root directory items recent history.
func (sn *snotifier) Flush() {
sn.flush()
}
func (sn *snotifier) flush() {
if sn == nil {
return
}
sn.paths.Range(func(key interface{}, value interface{}) bool {
sn.paths.Delete(key)
return true
})
}
// New provides an initialized object which satisfies the ScanNotifier interface.
// It initializes itself based on the `root` value which is expected to be a
// path to an accessible directory. The content of the root directory will be
// parsed and loaded based on the options provided by `opts`. It returns and error
// which wraps `ErrInvalidRootDirPath` in case the root path is not an accessible
// directory. `ErrInitialization` means the initialization encoutered an error.
func New(root string, opts *Options) (ScanNotifier, error) {
if fi, err := os.Stat(root); err != nil || !fi.IsDir() {
return nil, fmt.Errorf("%w: %v", ErrInvalidRootDirPath, err)
}
opts = opts.setup()
sn := &snotifier{
root: root,
opts: opts,
paths: sync.Map{},
}
if err := filepath.WalkDir(sn.root, sn.init); err != nil {
return nil, fmt.Errorf("%w: %v", ErrInitialization, err)
}
sn.queue = make(chan Event, opts.queueSize)
sn.iqueue = make(chan *fsEntry, opts.queueSize)
sn.stop = make(chan struct{})
sn.wg = &sync.WaitGroup{}
sn.ready = true
return sn, nil
}
func (sn *snotifier) init(s string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
t := getPathType(d.Type())
if ignore, err := sn.check(s, t, err); ignore {
return err
}
if fi, err := d.Info(); err == nil {
sn.paths.Store(s, &pathInfos{fi.ModTime(), d.Type(), false})
}
return err
}
// Start is a blocking method which pre-boots the consumers
// and starts the infinite loop scanner to monitor the root
// directory contents.
func (sn *snotifier) Start(ctx context.Context) error {
if sn.IsRunning() {
return ErrScanAlreadyStarted
}
if sn.isStopping() {
return ErrScanIsStopping
}
if !sn.ready {
return ErrScanIsNotReady
}
sn.running.Store(true)
// sn.workers()
sn.scanner(ctx)
return nil
}
// scanner runs an infinite scan loop after each interval of time.
// it exits on context cancellation or on call to stop the notifier.
func (sn *snotifier) scanner(ctx context.Context) {
var done atomic.Bool
for {
select {
case <-sn.stop:
sn.finalize()
return
case <-ctx.Done():
sn.finalize()
return
default:
if sn.paused.Load() {
time.Sleep(sn.opts.scanInterval.Load().(time.Duration))
continue
}
done.Store(false)
sn.workers(&done)
filepath.WalkDir(sn.root, sn.scan)
done.Store(true)
sn.wg.Wait()
if !sn.opts.event.ignoreDelete.Load() {
sn.missingPaths()
}
time.Sleep(sn.opts.scanInterval.Load().(time.Duration))
}
}
}
func (sn *snotifier) scan(s string, d fs.DirEntry, err error) error {
t := getPathType(d.Type())
if ignore, cerr := sn.check(s, t, err); ignore {
return cerr
}
fse := &fsEntry{path: s, d: d}
if err != nil {
fse.err = err
}
sn.iqueue <- fse
return nil
}
// missingPaths scans all latest registered paths to find
// deleted paths and trigger a `DELETE` event for each if
// this option was enabled. It aborts once the notifier is
// stopped.
func (sn *snotifier) missingPaths() {
sn.paths.Range(func(key, value any) bool {
if !sn.running.Load() {
return false
}
path := key.(string)
pi := value.(*pathInfos)
if pi.visited {
pi.visited = false
return true
}
if !sn.opts.event.ignoreDelete.Load() {
ev := Event{path, getPathType(pi.mode), DELETE, nil}
sn.queueEvent(ev)
}
sn.paths.Delete(path)
return true
})
}
// Pause triggers the scanner routine to escape at each intervall
// so that no new changes will be detected and no events to be sent.
func (sn *snotifier) Pause() error {
if sn.isStopping() {
return ErrScanIsStopping
}
if !sn.IsRunning() {
return ErrScanIsNotRunning
}
sn.paused.Store(true)
return nil
}
// Resume triggers the scanner routine to restart checking files
// and sending events on changes detection. It is expected to be
// called after the scannotifier is into `paused` state.
func (sn *snotifier) Resume() error {
if sn.isStopping() {
return ErrScanIsStopping
}
if !sn.IsRunning() {
return ErrScanIsNotRunning
}
if !sn.paused.Load() {
return ErrScanIsNotPaused
}
sn.paused.Store(false)
return nil
}