-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretry_limiter.go
140 lines (122 loc) · 3.03 KB
/
retry_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package common
/*
limiter with same names share throughput counter.
Usage:
limiter = common.NewRetryLimiter("Func1", 2, 0.1)
for limiter.CanRetry() {
// do something here
}
*/
import (
"fmt"
"sync"
"time"
)
type limiterData struct {
window int64
throughput int // throughtput in current window
lastThroughput int // throughtput in last time window
retryCount int
lock *sync.Mutex
}
var (
throughputWindow int
dataMap map[string]*limiterData
countChannel chan string
dataMapLock *sync.Mutex
)
func init() {
throughputWindow = 10
dataMap = make(map[string]*limiterData)
countChannel = make(chan string)
dataMapLock = &sync.Mutex{}
go func(countChannel chan string) {
// couunt throughput for all retries in current process
for {
writerName, ok := <-countChannel
if !ok {
break
}
dataMapLock.Lock()
if _, ok := dataMap[writerName]; !ok {
crtData := limiterData{
0, 0, 0, 0, &sync.Mutex{},
}
dataMap[writerName] = &crtData
}
crtWindow := time.Now().Unix() / int64(throughputWindow)
crtData := dataMap[writerName]
if crtWindow > crtData.window {
// entering new window
crtData.window = crtWindow
crtData.lastThroughput = crtData.throughput
crtData.throughput = 1
} else {
crtData.throughput++
}
dataMapLock.Unlock()
}
}(countChannel)
}
// InitRetryLimiter setup retry limit
func InitRetryLimiter(window int) {
throughputWindow = window
}
// StopRetryLimiter stop retry counter
func StopRetryLimiter() {
close(countChannel)
}
// RetryLimiter handler for retry limiter
type RetryLimiter struct {
name string
retryLimit int
retryCount int
retryRatio float64
}
// NewRetryLimiter create a new retry limiter, you should create a new retry limiter everytime you calls rpc
func NewRetryLimiter(name string, retryLimit int, retryRatio float64) *RetryLimiter {
countChannel <- name
return &RetryLimiter{name, retryLimit, 0, retryRatio}
}
// CanRetry call this function everytime before actuall function call
func (p *RetryLimiter) CanRetry() bool {
if p.retryCount == 0 {
// first time always success
p.retryCount = 1
return true
}
if p.retryCount >= p.retryLimit {
return false
}
dataMapLock.Lock()
crtData, ok := dataMap[p.name]
dataMapLock.Unlock()
if !ok {
p.retryCount++
return true
}
crtData.lock.Lock()
defer func() {
crtData.lock.Unlock()
}()
if crtData.lastThroughput >= 0 && float64(crtData.retryCount) > float64(crtData.lastThroughput)*p.retryRatio {
return false
}
p.retryCount++
return true
}
// Debug for debug only
func (p *RetryLimiter) Debug() string {
dataMapLock.Lock()
crtData, ok := dataMap[p.name]
if !ok {
dataMapLock.Unlock()
return fmt.Sprintf("error:%v", dataMap)
}
result := fmt.Sprintf("crtTime:%v, win: %v, name: %v, limit:%v, retry: %v, limitRatio: %v, throughput: %v, lastThroughput: %v",
time.Now().UnixNano(), crtData.window,
p.name, p.retryLimit, p.retryCount, p.retryRatio, crtData.throughput, crtData.lastThroughput,
)
dataMapLock.Unlock()
return result
}