-
-
Notifications
You must be signed in to change notification settings - Fork 53
/
modes.go
88 lines (71 loc) · 1.7 KB
/
modes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package stateless
import (
"context"
"sync"
"sync/atomic"
)
type fireMode interface {
Fire(ctx context.Context, trigger Trigger, args ...any) error
Firing() bool
}
type fireModeImmediate struct {
ops atomic.Uint64
sm *StateMachine
}
func (f *fireModeImmediate) Firing() bool {
return f.ops.Load() > 0
}
func (f *fireModeImmediate) Fire(ctx context.Context, trigger Trigger, args ...any) error {
f.ops.Add(1)
defer f.ops.Add(^uint64(0))
return f.sm.internalFireOne(ctx, trigger, args...)
}
type queuedTrigger struct {
Context context.Context
Trigger Trigger
Args []any
}
type fireModeQueued struct {
firing atomic.Bool
sm *StateMachine
triggers []queuedTrigger
mu sync.Mutex // guards triggers
}
func (f *fireModeQueued) Firing() bool {
return f.firing.Load()
}
func (f *fireModeQueued) Fire(ctx context.Context, trigger Trigger, args ...any) error {
f.enqueue(ctx, trigger, args...)
for {
et, ok := f.fetch()
if !ok {
break
}
err := f.execute(et)
if err != nil {
return err
}
}
return nil
}
func (f *fireModeQueued) enqueue(ctx context.Context, trigger Trigger, args ...any) {
f.mu.Lock()
defer f.mu.Unlock()
f.triggers = append(f.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args})
}
func (f *fireModeQueued) fetch() (et queuedTrigger, ok bool) {
f.mu.Lock()
defer f.mu.Unlock()
if len(f.triggers) == 0 {
return queuedTrigger{}, false
}
if !f.firing.CompareAndSwap(false, true) {
return queuedTrigger{}, false
}
et, f.triggers = f.triggers[0], f.triggers[1:]
return et, true
}
func (f *fireModeQueued) execute(et queuedTrigger) error {
defer f.firing.Swap(false)
return f.sm.internalFireOne(et.Context, et.Trigger, et.Args...)
}