-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsemaphore.go
116 lines (105 loc) · 3.04 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package semaphore
import (
"sync/atomic"
"time"
)
// An integer-valued semaphore
type Semaphore struct {
value int64 // the value of the semaphore, accessed via atomic operations only
acquireMu chan struct{} // used as a mutex to gate goroutines that are going to sleep when acquiring the semaphore
wake chan struct{} // used to notify a sleeping goroutine that the semaphore is once more positive
}
// Creates a new semaphore with initial value n. Panics if n is negative.
func New(n int) *Semaphore {
if n < 0 {
panic("negative initial value for a semaphore")
}
ch := make(chan struct{}, 1)
ch <- struct{}{}
return &Semaphore{
value: int64(n),
acquireMu: ch,
wake: make(chan struct{}, 1),
}
}
// Tries to decrease the semaphore's value by n. Panics if n is negative. If it is smaller than n, waits until it grows
// large enough. If the cancel channel becomes readable before that happens, the request is cancelled. Returns true
// if the semaphore was decreased, false if the operation was cancelled.
func (s *Semaphore) AcquireCancellable(n int, cancel <-chan struct{}) bool {
if n < 0 {
panic("Semaphore.Acquire called with negative decrement")
}
v := atomic.LoadInt64(&s.value)
for v >= int64(n) {
if atomic.CompareAndSwapInt64(&s.value, v, v-int64(n)) {
return true
}
v = atomic.LoadInt64(&s.value)
}
select {
case <-cancel:
return false
case <-s.acquireMu:
}
defer func() {
s.acquireMu <- struct{}{}
}()
v = atomic.AddInt64(&s.value, int64(-n))
for v < 0 {
select {
case <-cancel:
atomic.AddInt64(&s.value, int64(n))
return false
case <-s.wake:
v = atomic.LoadInt64(&s.value)
}
}
return true
}
// Tries to decrease the semaphore's value by n. Panics if n is negative. If it is smaller than n, waits until it grows
// large enough.
func (s *Semaphore) Acquire(n int) {
s.AcquireCancellable(n, nil)
}
// Tries to decrease the semaphore's value by n. Panics if n is negative. If it is smaller than n, waits until it grows
// large enough or until delay has passed. Returns true on success and false on timeout.
func (s *Semaphore) TimedAcquire(n int, delay time.Duration) bool {
// Fast path: we can acquire the semaphore right now
closed := make(chan struct{})
close(closed)
if s.AcquireCancellable(n, closed) {
return true
}
cancel := make(chan struct{})
timer := time.AfterFunc(delay, func() {
close(cancel)
})
defer timer.Stop()
return s.AcquireCancellable(n, cancel)
}
// Increases the semaphore's value by n. Panics if n is negative. Will never sleep.
func (s *Semaphore) Release(n int) {
if n < 0 {
panic("Semaphore.Release called with negative increment")
}
v := atomic.AddInt64(&s.value, int64(n))
if v-int64(n) < 0 && v >= 0 {
select {
case s.wake <- struct{}{}:
default:
}
}
}
// Decreases the semaphore value to 0 and returns the difference. Will never sleep.
func (s *Semaphore) Drain() int {
for {
v := atomic.LoadInt64(&s.value)
if v <= 0 {
return 0
}
if atomic.CompareAndSwapInt64(&s.value, v, 0) {
return int(v)
}
}
panic("unreachable")
}