-
Notifications
You must be signed in to change notification settings - Fork 0
/
hub.go
114 lines (97 loc) · 3.5 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package hub
import (
"reflect"
"sync"
"sync/atomic"
)
// Cancel is a cancellation closure for subscriptions. Cancels are idempotent.
//
// There is no requirement to cancel subscriptions. Use of cancellation is optional. It is an expected use case for a listener to remain subscribed for the life of the application.
type Cancel func()
// Publisher is a sink for events. Publisher instances are safe for concurrent use and require
// no external concurrency control.
//
// The Publisher implementation returned by New() uses copy-on-write semantics for the set of listeners
// and a lock-free (atomic value) approach to publish. This means that concurrent publishes can happen
// and should be expected.
//
// Publishers do not require events to be handled. If an event has no subscribers, a Publisher will simply do nothing for that event.
type Publisher interface {
// Publish routes an arbitrary object to subscribers.
//
// Publish is synchronous, so listeners should not perform long-running tasks
// without spawning a goroutine. If any listener panics, that panic will interrupt
// event delivery and the panic will escape the call to Publish.
Publish(interface{})
}
// Subscriber provides event subscriptions for listeners. Subscriber instances are safe for concurrent use.
type Subscriber interface {
// Subscribe registers a new listener. If an error occurs, the returned
// Cancel will be nil.
//
// If supplied, the afterCancel closures will be invoked during cancellation after
// the listener has been deregistered. Typical usage for an after closure
// is closing a subscribed channel or other channel that signals cancellation.
Subscribe(l interface{}, afterCancel ...func()) (Cancel, error)
}
// Interface provides both publish and subscribe functionality
type Interface interface {
Publisher
Subscriber
}
// New creates a hub for both publish and subscribe. The returned implementation is optimized around
// publishes occurring much more often than subscribes. The typical expected use case is that subscribes
// happen once, near application startup, and publishes happen throughout an application's lifetime.
func New() Interface {
return new(hub)
}
// Must panics if err is not nil. This function can be used to wrap Subscribe to panic instead of
// returning an error.
//
// // this will panic, as return values aren't allowed
// hub.Must(h.Subscribe(func(MyEvent) error {})
func Must(c Cancel, err error) Cancel {
if err != nil {
panic(err)
}
return c
}
// hub is the internal synchronous Dispatcher implementation
type hub struct {
subscribeLock sync.Mutex
subscriptions atomic.Value
}
func (h *hub) load() subscriptions {
v, _ := h.subscriptions.Load().(subscriptions)
return v
}
func (h *hub) store(new subscriptions) {
h.subscriptions.Store(new)
}
func (h *hub) Publish(e interface{}) {
h.load().publish(e)
}
func (h *hub) Subscribe(l interface{}, afterCancel ...func()) (Cancel, error) {
eventType, s, err := newSink(l)
if err != nil {
return nil, err
}
h.subscribeLock.Lock()
h.store(h.load().add(eventType, s))
h.subscribeLock.Unlock()
return h.cancel(eventType, s, afterCancel...), nil
}
// cancel creates a Cancel closure that will remove the given tuple from the subscriptions
func (h *hub) cancel(eventType reflect.Type, s sink, afterCancel ...func()) Cancel {
var once sync.Once
return func() {
once.Do(func() {
h.subscribeLock.Lock()
h.store(h.load().remove(eventType, s))
h.subscribeLock.Unlock()
for _, f := range afterCancel {
f()
}
})
}
}