-
Notifications
You must be signed in to change notification settings - Fork 245
/
taskset.go
117 lines (96 loc) · 2.6 KB
/
taskset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package boomer
import (
"sync"
)
// TaskSet is an experimental feature, the API is not stabilized.
// It needs to be more considered and tested.
type TaskSet interface {
// Add a Task to the TaskSet.
AddTask(task *Task)
// Set the weight of the TaskSet.
SetWeight(weight int)
// Get the weight of the TaskSet.
GetWeight() (weight int)
// Run will pick up a Task from the TaskSet and run.
Run()
}
// roundRobinTask is used by SmoothRoundRobinTaskSet.
type roundRobinTask struct {
Task
currentWeight int
effectiveWeight int
}
func newRoundRobinTask(task *Task) *roundRobinTask {
rrTask := &roundRobinTask{}
rrTask.Fn = task.Fn
rrTask.Weight = task.Weight
rrTask.Name = task.Name
rrTask.currentWeight = 0
rrTask.effectiveWeight = task.Weight
return rrTask
}
// Smooth weighted round-robin balancing algorithm as seen in Nginx.
// See aslo: https://github.com/linnik/roundrobin/blob/master/roundrobin/smooth_rr.py
type SmoothRoundRobinTaskSet struct {
// The weight of taskset
weight int
rrTasks []*roundRobinTask
lock sync.RWMutex
}
// NewSmoothRoundRobinTaskSet returns a new SmoothRoundRobinTaskSet.
func NewSmoothRoundRobinTaskSet() *SmoothRoundRobinTaskSet {
return &SmoothRoundRobinTaskSet{
weight: 0,
rrTasks: make([]*roundRobinTask, 0),
}
}
// AddTask add a Task to the Smooth RoundRobin TaskSet.
// If the task's weight is <= 0, it will be ignored.
func (ts *SmoothRoundRobinTaskSet) AddTask(task *Task) {
if task.Weight <= 0 {
return
}
rrTask := newRoundRobinTask(task)
ts.rrTasks = append(ts.rrTasks, rrTask)
}
func (ts *SmoothRoundRobinTaskSet) GetTask() (task *Task) {
rrTasksLength := len(ts.rrTasks)
if rrTasksLength == 0 {
return nil
}
if rrTasksLength == 1 {
return &ts.rrTasks[0].Task
}
totalWeight := 0
var picked *roundRobinTask
ts.lock.Lock()
for _, rrTask := range ts.rrTasks {
rrTask.currentWeight += rrTask.effectiveWeight
totalWeight += rrTask.effectiveWeight
if rrTask.effectiveWeight < rrTask.Weight {
rrTask.effectiveWeight += 1
}
if picked == nil || picked.currentWeight < rrTask.currentWeight {
picked = rrTask
}
}
picked.currentWeight -= totalWeight
ts.lock.Unlock()
return &picked.Task
}
// SetWeight sets the weight of the task set.
func (ts *SmoothRoundRobinTaskSet) SetWeight(weight int) {
ts.weight = weight
}
// GetWeight returns the weight of the task set.
func (ts *SmoothRoundRobinTaskSet) GetWeight() (weight int) {
return ts.weight
}
// Run will pick up a task in the task set smoothly and run.
// It can is used as a Task.Fn.
func (ts *SmoothRoundRobinTaskSet) Run() {
task := ts.GetTask()
if task != nil {
task.Fn()
}
}