-
Notifications
You must be signed in to change notification settings - Fork 14
/
throttle.go
107 lines (91 loc) · 2.4 KB
/
throttle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package iocontrol
import (
"io"
"time"
)
type (
Throttler interface {
// SetRate changes the rate at which the throttler allows reads or writes.
SetRate(perSec int)
}
ThrottlerReader interface {
io.Reader
Throttler
}
ThrottlerWriter interface {
io.Writer
Throttler
}
)
// ThrottledReader ensures that reads to `r` never exceeds a specified rate of
// bytes per second. The `maxBurst` duration changes how often the verification is
// done. The smaller the value, the less bursty, but also the more overhead there
// is to the throttling.
func ThrottledReader(r io.Reader, bytesPerSec int, maxBurst time.Duration) ThrottlerReader {
return &throttledReader{
wrap: r,
limiter: newRateLimiter(bytesPerSec, maxBurst),
}
}
type throttledReader struct {
wrap io.Reader
limiter *rateLimiter
}
func (t *throttledReader) Read(b []byte) (n int, err error) {
canRead := t.limiter.CanDo()
if len(b) <= canRead {
// no throttling needed
n, err = t.wrap.Read(b)
t.limiter.Did(n)
return n, err
}
if canRead > 0 {
// read what can be read for this batch
n, err = t.wrap.Read(b[:canRead])
}
t.limiter.Limit()
// return bytes read and let caller try another read
return n, err
}
// SetRate changes the rate at which the throttled reader allows reads.
func (t *throttledReader) SetRate(perSec int) {
t.limiter.SetRate(perSec)
}
// ThrottledWriter ensures that writes to `w` never exceeds a specified rate of
// bytes per second. The `maxBurst` duration changes how often the verification is
// done. The smaller the value, the less bursty, but also the more overhead there
// is to the throttling.
func ThrottledWriter(w io.Writer, bytesPerSec int, maxBurst time.Duration) ThrottlerWriter {
return &throttledWriter{
wrap: w,
limiter: newRateLimiter(bytesPerSec, maxBurst),
}
}
type throttledWriter struct {
wrap io.Writer
limiter *rateLimiter
}
func (t *throttledWriter) Write(b []byte) (n int, err error) {
var m int
for {
canWrite := t.limiter.CanDo()
if len(b[n:]) <= canWrite {
// no throttling needed
m, err = t.wrap.Write(b[n:])
n += m
t.limiter.Did(m)
return
}
// write what can be writen for this batch
m, err = t.wrap.Write(b[n : n+canWrite])
n += m
if err != nil {
return
}
t.limiter.Limit()
}
}
// SetRate changes the rate at which the throttled writer allows writes.
func (t *throttledWriter) SetRate(perSec int) {
t.limiter.SetRate(perSec)
}