-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathmany_to_one.go
130 lines (115 loc) · 4.14 KB
/
many_to_one.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package diodes
import (
"log"
"sync/atomic"
"unsafe"
)
// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
// reader (go-routine A). It is not thread safe for multiple readers.
type ManyToOne struct {
writeIndex uint64
buffer []unsafe.Pointer
readIndex uint64
alerter Alerter
}
// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
// is optimzed for many writers (on go-routines B-n) and a single reader
// (on go-routine A). The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewManyToOne(size int, alerter Alerter) *ManyToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}
d := &ManyToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}
// Start write index at the value before 0
// to allow the first write to use AddUint64
// and still have a beginning index of 0
d.writeIndex = ^d.writeIndex
return d
}
// Set sets the data in the next slot of the ring buffer.
func (d *ManyToOne) Set(data GenericDataType) {
for {
writeIndex := atomic.AddUint64(&d.writeIndex, 1)
idx := writeIndex % uint64(len(d.buffer))
old := atomic.LoadPointer(&d.buffer[idx])
if old != nil &&
(*bucket)(old) != nil &&
(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}
newBucket := &bucket{
data: data,
seq: writeIndex,
}
if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}
return
}
}
// TryNext will attempt to read from the next slot of the ring buffer.
// If there is not data available, it will return (nil, false).
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}
// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}
// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped)) // nolint:gosec
}
// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
//
d.readIndex++
return result.data, true
}