-
Notifications
You must be signed in to change notification settings - Fork 6
/
balancer.go
60 lines (53 loc) · 1.48 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main
import (
"fmt"
"log"
"reflect"
"runtime"
"strings"
)
// RebalanceConfig contains the configuration that drives the rebalancing.
type RebalanceConfig struct {
AllowLeaderRebalancing bool
MinReplicasForRebalancing int
MinUnbalance float64
Brokers []BrokerID
}
// DefaultRebalanceConfig returns the default RebalanceConfig. These values are
// also the one used as the default values for the CLI flags.
func DefaultRebalanceConfig() RebalanceConfig {
return RebalanceConfig{
AllowLeaderRebalancing: false,
MinReplicasForRebalancing: 2,
MinUnbalance: 0.00001,
}
}
var steps = []func(*PartitionList, RebalanceConfig) (*PartitionList, error){
ValidateWeights,
ValidateReplicas,
FillDefaults,
RemoveExtraReplicas,
AddMissingReplicas,
MoveDisallowedReplicas,
MoveLeaders,
MoveNonLeaders,
}
// Balance analyzes the workload distribution among brokers for the
// partitions listed in the argument. It returns a PartitionList with 0 or more
// partition reassignments.
func Balance(pl *PartitionList, cfg RebalanceConfig) (*PartitionList, error) {
for _, step := range steps {
stepFunc := runtime.FuncForPC(reflect.ValueOf(step).Pointer())
stepName := strings.TrimPrefix(stepFunc.Name(), "main.")
ppl, err := step(pl, cfg)
if err != nil {
return nil, fmt.Errorf("%s: %s", stepName, err)
}
if ppl != nil {
log.Printf("%s: %v", stepName, ppl)
return ppl, nil
}
}
log.Print("no candidate changes")
return emptypl(), nil
}