-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathf_kv_server.go
147 lines (123 loc) · 3.49 KB
/
f_kv_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"fmt"
"github.com/charmbracelet/log"
"github.com/hashicorp/memberlist"
"gopkg.in/yaml.v2"
"os"
"strings"
)
var distKvLogger = log.WithPrefix("dist-kv")
var configLogger = log.WithPrefix("config")
type DistKVServer struct {
config *configuration
store StorageEngine
ring HashRing
node Membership
api *HttpAPIServer
client Client
}
func NewDistKVServer(config *configuration) *DistKVServer {
httpAddress := fmt.Sprintf("%s:%d", GetLocalIP(), config.ExternalPort)
node, err := NewGossipMembership(config.InternalPort, httpAddress)
if err != nil {
distKvLogger.Fatalf("Failed to create node: %v", err)
}
kv := NewMemStorageEngine()
ring := NewBoundedLoadConsistentHashRing(config.PartitionCount, config.KeyReplicationCount)
distKV := DistKVServer{
config: config,
node: node,
store: kv,
ring: ring,
}
go distKV.handleMembershipChange(node.MembershipChangeCh())
api := NewAPI(&distKV, config.ExternalPort)
distKV.api = api
distKV.client = NewHttpClient(ring, api)
return &distKV
}
func (d *DistKVServer) Bootstrap() {
// Join the gossip network
err := d.node.Join(d.config.BootstrapNodes)
if err != nil {
distKvLogger.Fatalf("Failed to join distKV: %v", err)
}
// Start the HTTP API server
d.api.Run()
}
func (d *DistKVServer) handleMembershipChange(membershipChangeCh chan memberlist.NodeEvent) {
for {
select {
case event := <-membershipChangeCh:
httpAddress := event.Node.Name
switch event.Event {
case memberlist.NodeJoin:
d.ring.AddNode(httpAddress)
d.redistributePartitions()
distKvLogger.Infof("Node joined: %s", httpAddress)
case memberlist.NodeLeave:
d.ring.RemoveNode(httpAddress)
distKvLogger.Infof("Node left: %s", httpAddress)
default:
distKvLogger.Fatalf("Unknown event: %v", event.Event)
}
}
}
}
func (d *DistKVServer) redistributePartitions() {
for partitionId := range d.store.GetShards() {
newOwner := d.ring.ResolvePartitionOwnerNode(partitionId)
if newOwner != d.api.GetAddress() {
// send to newOwner
err := d.client.PutShard(partitionId, d.store.GetShard(partitionId))
if err != nil {
distKvLogger.Fatalf("Failed to redistribute partition %d to %s: %v", partitionId, newOwner, err)
continue
}
distKvLogger.Infof("Redistributing partition %d to %s", partitionId, newOwner)
// delete from old owner
d.store.DeleteShard(partitionId)
}
}
}
// -----------------Config -------------------
const (
localIp = "127.0.0.1"
)
type configuration struct {
Host string
ExternalPort int `yaml:"ExternalPort"`
InternalPort int `yaml:"InternalPort"`
BootstrapNodes []string `yaml:"BootstrapNodes"`
PartitionCount int `yaml:"PartitionCount"`
KeyReplicationCount int `yaml:"KeyReplicationCount"`
}
func loadConfig() *configuration {
configLogger.Info("loading configurations from config.yml")
config := &configuration{
Host: "0.0.0.0",
InternalPort: 8000,
ExternalPort: 8001,
BootstrapNodes: []string{},
PartitionCount: 30,
KeyReplicationCount: 3,
}
data, err := os.ReadFile("z_config.yml")
if err != nil {
configLogger.Fatal("Cannot load config.yml")
}
err = yaml.Unmarshal(data, config)
if err != nil {
configLogger.Fatal("Failed to unmarshal config.yml")
}
for i, addr := range config.BootstrapNodes {
if strings.HasPrefix(addr, ":") {
config.BootstrapNodes[i] = GetLocalIP() + addr
}
}
return config
}
func GetLocalIP() string {
return localIp
}