Skip to content

Commit

Permalink
feat: add raft from etcd key value store example
Browse files Browse the repository at this point in the history
  • Loading branch information
PapePathe committed Apr 6, 2024
1 parent 33a3bf5 commit e3bb76e
Show file tree
Hide file tree
Showing 11 changed files with 1,201 additions and 53 deletions.
4 changes: 4 additions & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Use goreman to run `go install github.com/mattn/goreman@latest`
raftexample1: ./raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380 --pedis 127.0.0.1:6379
raftexample2: ./raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380 --pedis 127.0.0.1:6389
raftexample3: ./raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380 --pedis 127.0.0.1:6399
3 changes: 2 additions & 1 deletion cmd/client/main.go → cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func main() {
Password: "",
DB: 0,
})
for i := 0; i < 10; i++ {

for i := 0; i < 2; i++ {
key := fmt.Sprintf("key-%d", i)
err := client.Set(context.Background(), key, "value", 0).Err()
if err != nil {
Expand Down
26 changes: 25 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ require (

require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/go-playground/assert/v2 v2.2.0 // indirect
github.com/go-redis/redis v6.15.9+incompatible // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-hclog v1.6.2 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
Expand All @@ -29,13 +34,32 @@ require (
github.com/hashicorp/serf v0.10.1 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.1.41 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.32.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/redis/go-redis/v9 v9.5.1 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/etcd/api/v3 v3.6.0-alpha.0 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 // indirect
go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0 // indirect
go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 // indirect
go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/grpc v1.41.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
168 changes: 168 additions & 0 deletions go.sum

Large diffs are not rendered by default.

39 changes: 28 additions & 11 deletions internal/storage/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@ import (
"time"
)

type datatype struct {
t rune
data []byte
type StorageData struct {
K string
T rune
D []byte
}

type SimpleStorage struct {
data map[string]datatype
exp map[string]time.Time
expLock sync.RWMutex
data map[string]StorageData
exp map[string]time.Time
proposeChan chan StorageData
expLock sync.RWMutex
sync.RWMutex
}

func NewSimpleStorage() *SimpleStorage {
func NewSimpleStorage(proposeChan chan StorageData) *SimpleStorage {
return &SimpleStorage{
data: make(map[string]datatype),
exp: make(map[string]time.Time),
data: make(map[string]StorageData),
exp: make(map[string]time.Time),
proposeChan: proposeChan,
}
}

// Endpoint for redis SET command
func (ss *SimpleStorage) Set(key string, value string, expires int64) error {
go ss.proposeRaftChange(StorageData{T: 's', D: []byte(value), K: key})
ss.Lock()
ss.data[key] = datatype{t: 's', data: []byte(value)}
ss.data[key] = StorageData{T: 's', D: []byte(value)}
ss.Unlock()

if expires > 0 {
Expand All @@ -49,5 +54,17 @@ func (ss *SimpleStorage) Get(key string) (string, error) {
return "", errors.New("key not found")
}

return string(v.data), nil
return string(v.D), nil
}

func (ss *SimpleStorage) WriteFromRaft(d StorageData) error {
ss.Lock()
ss.data[d.K] = StorageData{T: d.T, D: []byte(d.D)}
ss.Unlock()

return nil
}

func (ss *SimpleStorage) proposeRaftChange(data StorageData) {
ss.proposeChan <- data
}
4 changes: 4 additions & 0 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ package storage
type Storage interface {
Set(key string, value string, expires int64) error
Get(key string) (string, error)

// Raft related methods

WriteFromRaft(StorageData) error
}
76 changes: 36 additions & 40 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,52 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"log"
"pedis/node"
"pedis/internal/storage"
"pedis/praft"
"strings"
)

var addr string
var raftAddr string
var joinAddr string
var startJoinAddrs string
var id string
var membershipAddr string
var bootstrap bool

func init() {
flag.StringVar(&membershipAddr, "serf-addr", "localhost:6359", "the address where the cluster membership server is listening")
flag.StringVar(&raftAddr, "raft-addr", "localhost:6389", "the address where the raft server is listening")
flag.StringVar(&addr, "addr", "localhost:6379", "the address where the server will listen")
flag.StringVar(&id, "id", "primary", "the unique id of the server in the cluster")
flag.StringVar(&joinAddr, "join-addr", "", "set address of the leader of the cluster")
flag.StringVar(&startJoinAddrs, "sjoin-addr", "", "set addresses of cluster members to join when starting")
flag.BoolVar(&bootstrap, "bootstrap", false, "start as bootstrap node")
}
"go.etcd.io/etcd/raft/v3/raftpb"
)

func main() {
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
pedis := flag.String("pedis", ":6379", "port where pedis server is running")
flag.Parse()

joinAddrs := []string{}
proposeC := make(chan string)
defer close(proposeC)

if startJoinAddrs != "" {
joinAddrs = strings.Split(startJoinAddrs, ",")
}
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)

node, err := node.NewNode(node.Config{
Bootstrap: bootstrap,
JoinAddr: joinAddr,
RaftAddr: raftAddr,
MembershipAddr: membershipAddr,
ServerAddr: addr,
ServerId: id,
StartJoinAddrs: joinAddrs,
})
storageProposeChan := make(chan storage.StorageData)
defer close(storageProposeChan)

if err != nil {
log.Fatalf("error creating node %v", err)
}
// raft provides a commit stream for the proposals from the http api
var kvs *praft.PedisServer
getSnapshot := func() ([]byte, error) { return kvs.GetSnapshot() }
commitC, errorC, snapshotterReady := praft.NewRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

err = node.Start()
kvs = praft.NewKVStore(<-snapshotterReady, proposeC, commitC, errorC, storage.NewSimpleStorage(storageProposeChan), *pedis, storageProposeChan)

if err != nil {
log.Fatalf("error starting pedis node %v", err)
}
// the key-value http handler will propose updates to raft
praft.ServeHTTPKVAPI(kvs, *kvport, confChangeC, errorC)
}
128 changes: 128 additions & 0 deletions praft/httpapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package praft

import (
"io"
"log"
"net/http"
"strconv"

"go.etcd.io/etcd/raft/v3/raftpb"
)

// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
store *PedisServer
confChangeC chan<- raftpb.ConfChange
}

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
switch r.Method {
case http.MethodPut:
v, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}

log.Printf("command change (%v)\n", v)
// h.store.Propose(key, string(v))

// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
case http.MethodGet:
if v, ok := h.store.Lookup(key); ok {
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case http.MethodPost:
url, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}

nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}

cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeID,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
return
}

cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeID,
}
h.confChangeC <- cc

// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
default:
w.Header().Set("Allow", http.MethodPut)
w.Header().Add("Allow", http.MethodGet)
w.Header().Add("Allow", http.MethodPost)
w.Header().Add("Allow", http.MethodDelete)
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}

// serveHTTPKVAPI starts a key-value server with a GET/PUT API and listens.
func ServeHTTPKVAPI(kv *PedisServer, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{
Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{
store: kv,
confChangeC: confChangeC,
},
}
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()

go func() {
if err := kv.StartPedis(); err != nil {
log.Fatal(err)
}
}()
// exit when raft goes down
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
Loading

0 comments on commit e3bb76e

Please sign in to comment.