diff --git a/.ko.yml b/.ko.yml new file mode 100644 index 0000000..081ce71 --- /dev/null +++ b/.ko.yml @@ -0,0 +1,4 @@ +defaultPlatforms: + - linux/arm64 + - linux/amd64 + - linux/arm64/v8 diff --git a/Dockerfile b/Dockerfile index 4e7c963..a9f19fa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,8 +24,8 @@ COPY --from=build-stage /pedis/pedis /pedis EXPOSE 6379 EXPOSE 12380 -EXPOSE 1237 +EXPOSE 12379 # USER nonroot:nonroot -ENTRYPOINT /pedis -id $ID -pedis $PEDIS -cluster $CLUSTER -port $PORT +ENTRYPOINT /pedis -id $ID -pedis $PEDIS -cluster $CLUSTER -port $PORT -join $JOIN diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d4bd6d1 --- /dev/null +++ b/Makefile @@ -0,0 +1,17 @@ +clean: + rm -rf /tmp/pedis* + +build-image: + ko build + +start-primary: + go run main.go --cluster http://127.0.0.1:12379 + +start-secondary: + go run main.go --id 2 --join --pedis 127.0.0.1:6389 --cluster http://127.0.0.1:12379,http://127.0.0.1:12380 + +start-tertiary: + go run main.go --id 3 --pedis 127.0.0.1:6390 --cluster http://127.0.0.1:12379,http://127.0.0.1:12380,http://127.0.0.1:12381 + +test: + go test -v ./... -race diff --git a/cmd/client.go b/cmd/client.go index 0231fee..5ee84b4 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -2,20 +2,26 @@ package main import ( "context" + "flag" "fmt" "log" "github.com/redis/go-redis/v9" ) +var reqCount int + func main() { + flag.IntVar(&reqCount, "req", 100, "") + flag.Parse() + client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }) - for i := 0; i < 2; i++ { + for i := 0; i < reqCount; i++ { key := fmt.Sprintf("key-%d", i) err := client.Set(context.Background(), key, "value", 0).Err() if err != nil { @@ -29,5 +35,19 @@ func main() { } log.Println("Get result", result) + + key = fmt.Sprintf("hset:key-%d", i) + err = client.HSet(context.Background(), key, "name", "pathe").Err() + if err != nil { + log.Println(err) + } + + resultHset, err := client.HGet(context.Background(), key, "name").Result() + + if err != nil { + log.Println(err) + } + + log.Println("Hset result", resultHset) } } diff --git a/docker-compose.yml b/docker-compose.yml index 688420b..87c3e0b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,41 +1,28 @@ +version: "3.4" +x-app: &default-pedis + image: ko.local/ppathe/pedis/pedis-9e6a4042beefc3cd7105eec240e5b3ab:c3cb180cc2aba05bc6202089b4d76c334f18c7a2a5fdf9e2f53691d1bfcce727 + healthcheck: + test: /ko-app/pedis --health + interval: 1s + retries: 5 + start_period: 1s + timeout: 10s services: - ## memtier: - ## image: redis - ## command: redis-benchmark -h pedis-a - ## depends_on: - ## - pedis-a - ## - memtier: - image: redislabs/memtier_benchmark - command: memtier_benchmark -s pedis-a -p 6379 -R -n allkeys -d 50 -n 10 --command="hset __key__ name Pathe country Senegal" --command-ratio=2 --command-key-pattern=G pedis-a: - build: . - environment: - ID: 1 - PEDIS: :6379 - PORT: 12380 - CLUSTER: http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379 + <<: *default-pedis + container_name: pedis-a + entrypoint: /ko-app/pedis --cluster http://pedis-a:12379 ports: - "6379:6379" pedis-b: - environment: - ID: 2 - PEDIS: :6379 - PORT: 12380 - CLUSTER: http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379 - build: . + <<: *default-pedis + container_name: pedis-b + entrypoint: /ko-app/pedis --id 2 --join --cluster http://pedis-a:12379,http://pedis-b:12379 depends_on: - pedis-a - ports: - - "6389:6389" pedis-c: - environment: - ID: 3 - PEDIS: :6379 - PORT: 12380 - CLUSTER: http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379 - build: . + <<: *default-pedis + container_name: pedis-c + entrypoint: /ko-app/pedis --id 3 --join --cluster http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379 depends_on: - pedis-a - ports: - - "6399:6399" diff --git a/internal/commands/client_request.go b/internal/commands/client_request.go index d2bd2d2..6a7f553 100644 --- a/internal/commands/client_request.go +++ b/internal/commands/client_request.go @@ -3,13 +3,13 @@ package commands import ( "bytes" "fmt" - "log" "net" "pedis/internal/renderer" "pedis/internal/storage" "strings" "github.com/rs/zerolog" + "go.etcd.io/etcd/raft/v3/raftpb" ) type IClientRequest interface { @@ -47,8 +47,6 @@ func (r RawRequest) ReadArray() []string { sl := SliceAsChunks(items[3:], 2) array := []string{} - log.Println(sl) - for _, i := range sl { if len(i) == 2 { array = append(array, string(i[1])) @@ -59,11 +57,12 @@ func (r RawRequest) ReadArray() []string { } type ClientRequest struct { - Conn net.Conn - Data [][]byte - DataRaw RawRequest - Store storage.Storage - Logger zerolog.Logger + Conn net.Conn + Data [][]byte + DataRaw RawRequest + Store storage.Storage + Logger zerolog.Logger + ClusterChangesChan chan<- raftpb.ConfChange } func (c ClientRequest) WriteError(s string) error { @@ -106,6 +105,10 @@ func (c ClientRequest) WriteNumber(s string) error { } func (c ClientRequest) WriteOK() error { + _, err := c.Conn.Write([]byte("+OK\r\n")) + if err != nil { + return fmt.Errorf("net write error (%v)", err) + } return nil } @@ -117,6 +120,11 @@ func (c ClientRequest) WriteNil() error { return nil } -func (c ClientRequest) Write([]byte) (int, error) { - return 0, nil +func (c ClientRequest) Write(data []byte) (int, error) { + n, err := c.Conn.Write(data) + if err != nil { + return 0, fmt.Errorf("net write error (%v)", err) + } + + return n, nil } diff --git a/internal/commands/command.go b/internal/commands/command.go index b769c2b..e450597 100644 --- a/internal/commands/command.go +++ b/internal/commands/command.go @@ -15,15 +15,28 @@ func NewRequestHandler() *RequestHandler { func DefaultRequestHandler() *RequestHandler { subcommands := map[string]CommandHandler{ - "del": DelHandler, + // cluster commands + "cluster": ClusterHandler{}, + + // acl commands + "acl": AclHandler{}, + + "del": DelHandler{}, + "get": GetHandler{}, + "set": SetHandler{}, + // hash related commands - "hexists": HExistsHandler, - "hget": HGetHandler, - "hkeys": HKeysHandler, - "hlen": HLenHandler, - "hset": HSetHandler, - "hvals": HValsHandler, - "config": ConfigHandler, + "hexists": HExistsHandler{}, + "hget": HGetHandler{}, + "hkeys": HKeysHandler{}, + "hlen": HLenHandler{}, + "hset": HSetHandler{}, + "hvals": HValsHandler{}, + + "config": ConfigHandler{}, + "hello": HelloHandler{}, + "auth": AuthHandler{}, + "ping": PingHandler{}, } return &RequestHandler{subcommands} } @@ -32,21 +45,19 @@ func (s RequestHandler) Run(request ClientRequest) { subcommand := strings.ToLower(string(request.Data[2])) if h, ok := s.subcommands[subcommand]; ok { - go h(request) + if err := h.Authorize(request); err != nil { + request.WriteError("not authorized to access command") + return + } + go h.Handle(request) } else { switch subcommand { - case "hello": - go HelloHandler(request.Data, request.Store, request.Conn) - case "get": - go GetHandler(request.Data, request.Store, request.Conn) - case "set": - go SetHandler(request.Data, request.Store, request.Conn) case "client": request.WriteString("OK") request.Logger.Debug().Msg("going to execute client options command") default: - request.WriteError(fmt.Sprintf("command not supported %v", string(request.Data[2]))) - request.Logger.Debug().Str("command", string(request.Data[2])).Msg("is not yet supported") + request.WriteError(fmt.Sprintf("command not supported %v", subcommand)) + request.Logger.Debug().Str("command", subcommand).Msg("is not yet supported") } } } diff --git a/internal/commands/handler.go b/internal/commands/handler.go index 0e9d2a6..8066b16 100644 --- a/internal/commands/handler.go +++ b/internal/commands/handler.go @@ -1,3 +1,12 @@ package commands -type CommandHandler func(ClientRequest) +type CommandHandler interface { + // Runs the request and write response to client + Handle(ClientRequest) + // Checks that calling user has the permissions required by the command + Authorize(ClientRequest) error + // Returns the list of permissions required to run the command + Permissions() []string + // Returns true if the command is going to persist data + Persistent() bool +} diff --git a/internal/commands/handler_acl.go b/internal/commands/handler_acl.go new file mode 100644 index 0000000..a2f7e13 --- /dev/null +++ b/internal/commands/handler_acl.go @@ -0,0 +1,105 @@ +package commands + +import ( + "fmt" + "pedis/internal/storage" +) + +type AclHandler struct{} + +func (ch AclHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch AclHandler) Permissions() []string { + return nil +} + +func (ch AclHandler) Persistent() bool { + return false +} + +func (ch AclHandler) Handle(r ClientRequest) { + data := r.DataRaw.ReadArray() + r.Logger. + Debug(). + Interface("Data", data). + Interface("RawData", r.DataRaw.String()). + Msg("") + + svc := aclService{} + + switch data[0] { + case "setuser": + _ = svc.setuser(r) + case "deluser": + _ = svc.deluser(r) + case "users": + _ = svc.users(r) + default: + r.WriteError(fmt.Sprintf("(%s) not implemented by devin", data[0])) + } +} + +type aclService struct{} + +func (aclService) deluser(r ClientRequest) error { + data := r.DataRaw.ReadArray() + r.Logger.Debug().Interface("usernames", data[1:]).Msg("Going to delete") + delCount := 0 + + for _, u := range data[1:] { + err := r.Store.DelUser(u) + + if err == nil { + delCount++ + } + } + + r.WriteNumber(fmt.Sprintf("%d", delCount)) + + return nil +} + +func (aclService) setuser(r ClientRequest) error { + data := r.DataRaw.ReadArray() + r.Logger.Debug().Msg("Going to create or update existing user") + username := data[1] + rules := []storage.AclRule{} + + if len(data) >= 2 { + for _, elem := range data[2:] { + switch elem { + case "on": + rules = append(rules, storage.AclRule{Type: storage.AclActivateUser}) + case "off": + rules = append(rules, storage.AclRule{Type: storage.AclDisableUser}) + case "nopass": + rules = append(rules, storage.AclRule{Type: storage.AclDisableUser}) + case "reset": + rules = append(rules, storage.AclRule{Type: storage.AclResetUser}) + default: + switch elem[0] { + case '>': + rules = append(rules, storage.AclRule{Type: storage.AclSetUserPassword, Value: elem[1 : len(elem)-1]}) + default: + r.WriteError(fmt.Sprintf("acl rule (%s) not supported", elem)) + } + } + } + } + + _ = r.Store.SetUser(username, rules) + r.WriteOK() + return nil +} + +func (aclService) users(r ClientRequest) error { + r.Logger.Debug().Msg("Going to list users") + + users := r.Store.Users() + r.Logger.Debug().Interface("Users list", users).Msg("") + r.WriteArray(users) + + return nil +} diff --git a/internal/commands/handler_auth.go b/internal/commands/handler_auth.go new file mode 100644 index 0000000..7a0997b --- /dev/null +++ b/internal/commands/handler_auth.go @@ -0,0 +1,43 @@ +package commands + +type AuthHandler struct{} + +func (ch AuthHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch AuthHandler) Permissions() []string { + return nil +} + +func (ch AuthHandler) Persistent() bool { + return false +} + +func (ch AuthHandler) Handle(r ClientRequest) { + data := r.DataRaw.ReadArray() + r.Logger.Info().Interface("auth params", data).Msg("") + + user, err := r.Store.GetUser(data[0]) + if err != nil { + r.WriteError(err.Error()) + return + } + + if user.AnyPassword { + r.WriteOK() + return + } + + if len(data) == 1 { + r.WriteError("Password must be supplied") + return + } + + if err := user.Authenticate(data[1]); err != nil { + r.WriteError(err.Error()) + return + } + + r.WriteOK() +} diff --git a/internal/commands/handler_cluster.go b/internal/commands/handler_cluster.go new file mode 100644 index 0000000..6d1e4af --- /dev/null +++ b/internal/commands/handler_cluster.go @@ -0,0 +1,60 @@ +package commands + +import ( + "log" + "strconv" + + "go.etcd.io/etcd/raft/v3/raftpb" +) + +type ClusterHandler struct{} + +func (ch ClusterHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch ClusterHandler) Permissions() []string { + return nil +} + +func (ch ClusterHandler) Persistent() bool { + return false +} + +func (ch ClusterHandler) Handle(r ClientRequest) { + data := r.DataRaw.ReadArray() + log.Println(data) + + switch string(data[0]) { + case "forget": + id, err := strconv.Atoi(data[1]) + + if err != nil { + r.WriteError(err.Error()) + } + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeRemoveNode, + NodeID: uint64(id), + } + r.WriteOK() + r.ClusterChangesChan <- cc + + case "meet": + id, err := strconv.Atoi(data[1]) + + if err != nil { + r.WriteError(err.Error()) + } + + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: uint64(id), + Context: []byte(data[2]), + } + r.WriteOK() + r.ClusterChangesChan <- cc + default: + r.WriteError("subcommand not found") + } + +} diff --git a/internal/commands/handler_config.go b/internal/commands/handler_config.go index b63a75d..ed96ba0 100644 --- a/internal/commands/handler_config.go +++ b/internal/commands/handler_config.go @@ -1,5 +1,19 @@ package commands -func ConfigHandler(r ClientRequest) { +type ConfigHandler struct{} + +func (ch ConfigHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch ConfigHandler) Permissions() []string { + return nil +} + +func (ch ConfigHandler) Persistent() bool { + return false +} + +func (ch ConfigHandler) Handle(r ClientRequest) { r.WriteError("not yet implemented") } diff --git a/internal/commands/handler_del.go b/internal/commands/handler_del.go index 2a29175..77d3142 100644 --- a/internal/commands/handler_del.go +++ b/internal/commands/handler_del.go @@ -4,7 +4,21 @@ import "fmt" // ACL categories: @keyspace, @write, @slow -func DelHandler(r ClientRequest) { +type DelHandler struct{} + +func (ch DelHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch DelHandler) Permissions() []string { + return nil +} + +func (ch DelHandler) Persistent() bool { + return false +} + +func (ch DelHandler) Handle(r ClientRequest) { r.Logger.Debug().Interface("Data", r.DataRaw.ReadArray()).Str("RawData", r.DataRaw.String()).Msg("del handler") delCount := 0 diff --git a/internal/commands/handler_get.go b/internal/commands/handler_get.go index a67cbb2..9231448 100644 --- a/internal/commands/handler_get.go +++ b/internal/commands/handler_get.go @@ -1,17 +1,29 @@ package commands import ( - "net" "pedis/internal/renderer" - "pedis/internal/storage" ) -func GetHandler(items [][]byte, store storage.Storage, conn net.Conn) { - val, err := store.Get(string(items[4])) +type GetHandler struct{} + +func (ch GetHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch GetHandler) Permissions() []string { + return nil +} + +func (ch GetHandler) Persistent() bool { + return true +} + +func (ch GetHandler) Handle(r ClientRequest) { + val, err := r.Store.Get(string(r.Data[4])) if err != nil { - conn.Write([]byte("-ERR key not found\r\n")) + r.Write([]byte("-ERR key not found\r\n")) } - r := renderer.BulkStringRenderer{} - conn.Write(r.Render(val)) + rr := renderer.BulkStringRenderer{} + r.Write(rr.Render(val)) } diff --git a/internal/commands/handler_hello.go b/internal/commands/handler_hello.go index d92e7fa..d7c6e58 100644 --- a/internal/commands/handler_hello.go +++ b/internal/commands/handler_hello.go @@ -2,13 +2,24 @@ package commands import ( "log" - "net" "pedis/internal/response" - "pedis/internal/storage" ) -func HelloHandler(_ [][]byte, _ storage.Storage, conn net.Conn) { - log.Println("Respond to hello command") +type HelloHandler struct{} + +func (ch HelloHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch HelloHandler) Permissions() []string { + return nil +} + +func (ch HelloHandler) Persistent() bool { + return false +} + +func (ch HelloHandler) Handle(r ClientRequest) { hr := response.HelloResponse{ Server: "redis", Version: "6.2.1", @@ -17,7 +28,7 @@ func HelloHandler(_ [][]byte, _ storage.Storage, conn net.Conn) { Role: "master", } - _, err := conn.Write(hr.Render()) + _, err := r.Write(hr.Render()) if err != nil { log.Println(err) diff --git a/internal/commands/handler_hexists.go b/internal/commands/handler_hexists.go index 2b3d52a..fa096ad 100644 --- a/internal/commands/handler_hexists.go +++ b/internal/commands/handler_hexists.go @@ -1,6 +1,20 @@ package commands -func HExistsHandler(r ClientRequest) { +type HExistsHandler struct{} + +func (ch HExistsHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch HExistsHandler) Permissions() []string { + return nil +} + +func (ch HExistsHandler) Persistent() bool { + return false +} + +func (ch HExistsHandler) Handle(r ClientRequest) { r.Logger.Debug().Str("key", string(r.Data[4])).Msg("hexists handler") data, err := r.Store.HGet(string(r.Data[4])) diff --git a/internal/commands/handler_hget.go b/internal/commands/handler_hget.go index 9ed539c..36beeee 100644 --- a/internal/commands/handler_hget.go +++ b/internal/commands/handler_hget.go @@ -1,9 +1,24 @@ package commands -func HGetHandler(r ClientRequest) { - r.Logger.Debug().Str("hset key", string(r.Data[4])).Msg("hget handler") +type HGetHandler struct{} - data, err := r.Store.HGet(string(r.Data[4])) +func (ch HGetHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch HGetHandler) Permissions() []string { + return nil +} + +func (ch HGetHandler) Persistent() bool { + return false +} + +func (ch HGetHandler) Handle(r ClientRequest) { + r.Logger.Debug().Interface("command", r.DataRaw.ReadArray()).Msg("hget handler") + + datat := r.DataRaw.ReadArray() + data, err := r.Store.HGet(datat[0]) if err != nil { _ = r.WriteNil() @@ -13,7 +28,7 @@ func HGetHandler(r ClientRequest) { hs := hset{} hs.FromBytes(data) - value, err := hs.Get(string(r.Data[6])) + value, err := hs.Get(datat[1]) if err != nil { _ = r.WriteNil() diff --git a/internal/commands/handler_hkeys.go b/internal/commands/handler_hkeys.go index 9252c04..6646ee4 100644 --- a/internal/commands/handler_hkeys.go +++ b/internal/commands/handler_hkeys.go @@ -1,6 +1,20 @@ package commands -func HKeysHandler(r ClientRequest) { +type HKeysHandler struct{} + +func (ch HKeysHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch HKeysHandler) Permissions() []string { + return nil +} + +func (ch HKeysHandler) Persistent() bool { + return false +} + +func (ch HKeysHandler) Handle(r ClientRequest) { r.Logger.Debug().Str("key", string(r.Data[4])).Msg("hkeys handler") data, err := r.Store.HGet(string(r.Data[4])) diff --git a/internal/commands/handler_hlen.go b/internal/commands/handler_hlen.go index 67401a6..42817ce 100644 --- a/internal/commands/handler_hlen.go +++ b/internal/commands/handler_hlen.go @@ -4,7 +4,21 @@ import ( "fmt" ) -func HLenHandler(r ClientRequest) { +type HLenHandler struct{} + +func (ch HLenHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch HLenHandler) Permissions() []string { + return nil +} + +func (ch HLenHandler) Persistent() bool { + return false +} + +func (ch HLenHandler) Handle(r ClientRequest) { r.Logger.Debug().Str("key", string(r.Data[4])).Msg("hlen handler") data, err := r.Store.HGet(string(r.Data[4])) diff --git a/internal/commands/handler_hset.go b/internal/commands/handler_hset.go index 28dae4d..5879685 100644 --- a/internal/commands/handler_hset.go +++ b/internal/commands/handler_hset.go @@ -5,12 +5,24 @@ import ( "encoding/gob" "errors" "fmt" - "log" "strings" ) -// HSetHandler -func HSetHandler(r ClientRequest) { +type HSetHandler struct{} + +func (ch HSetHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch HSetHandler) Permissions() []string { + return nil +} + +func (ch HSetHandler) Persistent() bool { + return true +} + +func (ch HSetHandler) Handle(r ClientRequest) { r.Logger.Info().Str("hset key", string(r.Data[4])).Msg("hset handler") hs := chunkSlice(r.Data[5:], 4) @@ -29,7 +41,7 @@ func HSetHandler(r ClientRequest) { } _ = hs.FromBytes(data) - _, _ = r.Conn.Write([]byte(fmt.Sprintf(":%d\r\n", hs.Len()))) + _, _ = r.Write([]byte(fmt.Sprintf(":%d\r\n", hs.Len()))) } type hasharray [][]byte @@ -95,7 +107,6 @@ func (hs hset) Values() []string { } func (hs hset) ToBytes() ([]byte, error) { - log.Println(hs) buf := new(bytes.Buffer) enc := gob.NewEncoder(buf) if err := enc.Encode(hs); err != nil { diff --git a/internal/commands/handler_hvals.go b/internal/commands/handler_hvals.go index da2c7a3..d38589c 100644 --- a/internal/commands/handler_hvals.go +++ b/internal/commands/handler_hvals.go @@ -1,6 +1,20 @@ package commands -func HValsHandler(r ClientRequest) { +type HValsHandler struct{} + +func (ch HValsHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch HValsHandler) Permissions() []string { + return nil +} + +func (ch HValsHandler) Persistent() bool { + return false +} + +func (ch HValsHandler) Handle(r ClientRequest) { r.Logger.Debug().Str("key", string(r.Data[4])).Msg("hvals handler") data, err := r.Store.HGet(string(r.Data[4])) diff --git a/internal/commands/handler_ping.go b/internal/commands/handler_ping.go new file mode 100644 index 0000000..03e973c --- /dev/null +++ b/internal/commands/handler_ping.go @@ -0,0 +1,27 @@ +package commands + +type PingHandler struct{} + +func (ch PingHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch PingHandler) Permissions() []string { + return []string{} +} + +func (ch PingHandler) Persistent() bool { + return false +} + +func (ch PingHandler) Handle(r ClientRequest) { + data := r.DataRaw.ReadArray() + r.Logger.Info().Str("Cmd", r.DataRaw.String()).Interface("ping params", data).Msg("") + + if len(data) == 0 { + r.WriteString("PONG") + return + } + + r.WriteString(data[0]) +} diff --git a/internal/commands/handler_set.go b/internal/commands/handler_set.go index bc928d6..528d7e6 100644 --- a/internal/commands/handler_set.go +++ b/internal/commands/handler_set.go @@ -1,40 +1,52 @@ package commands import ( - "net" - "pedis/internal/storage" "strconv" ) -func SetHandler(items [][]byte, store storage.Storage, conn net.Conn) { - value := string(items[6]) +type SetHandler struct{} + +func (ch SetHandler) Authorize(ClientRequest) error { + return nil +} + +func (ch SetHandler) Permissions() []string { + return nil +} + +func (ch SetHandler) Persistent() bool { + return true +} + +func (ch SetHandler) Handle(r ClientRequest) { + value := string(r.Data[6]) if len(value) == 0 { - _, _ = conn.Write([]byte("-ERR value is empty\r\n")) + _, _ = r.Write([]byte("-ERR value is empty\r\n")) return } - key := string(items[4]) + key := string(r.Data[4]) if len(key) == 0 { - _, _ = conn.Write([]byte("-ERR key is empty\r\n")) + _, _ = r.Write([]byte("-ERR key is empty\r\n")) return } exp := 0 - if len(items) > 8 { + if len(r.Data) > 8 { var err error - exp, err = strconv.Atoi(string(items[10])) + exp, err = strconv.Atoi(string(r.Data[10])) if err != nil { - _, _ = conn.Write([]byte("-ERR expiration cannot be casted to number\r\n")) + _, _ = r.Write([]byte("-ERR expiration cannot be casted to number\r\n")) return } } - err := store.Set(key, value, int64(exp)) + err := r.Store.Set(key, value, int64(exp)) if err != nil { - _, _ = conn.Write([]byte("-ERR error\r\n")) + _, _ = r.Write([]byte("-ERR error\r\n")) return } - _, _ = conn.Write([]byte("+OK\r\n")) + _, _ = r.Write([]byte("+OK\r\n")) } diff --git a/internal/storage/simple.go b/internal/storage/simple.go index 4a38b6d..8dbcb2d 100644 --- a/internal/storage/simple.go +++ b/internal/storage/simple.go @@ -2,21 +2,25 @@ package storage import ( "errors" + "fmt" "sync" "time" ) type SimpleStorage struct { - data map[string]StorageDataInternal + data map[string]StorageDataInternal + sync.RWMutex + acl map[string]*User + aclLock sync.RWMutex exp map[string]time.Time - proposeChan chan StorageData expLock sync.RWMutex - sync.RWMutex + proposeChan chan StorageData } func NewSimpleStorage(proposeChan chan StorageData) *SimpleStorage { return &SimpleStorage{ data: make(map[string]StorageDataInternal), + acl: make(map[string]*User), exp: make(map[string]time.Time), proposeChan: proposeChan, } @@ -45,6 +49,73 @@ func (ss *SimpleStorage) HGet(key string) ([]byte, error) { return val.D, nil } +func (ss *SimpleStorage) GetUser(username string) (*User, error) { + ss.aclLock.Lock() + u, ok := ss.acl[username] + ss.aclLock.Unlock() + + if !ok { + return nil, fmt.Errorf("User %s not found in storage", username) + } + + return u, nil +} +func (ss *SimpleStorage) SetUser(username string, rules []AclRule) error { + ss.aclLock.Lock() + defer ss.aclLock.Unlock() + + u, ok := ss.acl[username] + if !ok { + ss.acl[username] = &User{} + } + + u = ss.acl[username] + for _, r := range rules { + switch r.Type { + case AclActivateUser: + u.Active = true + case AclDisableUser: + u.Active = false + case AclAnyPassword: + u.AnyPassword = true + case AclSetUserPassword: + u.Passwords = append(u.Passwords, r.Value) + default: + return fmt.Errorf("acl rule not supported (%v)", r.Type) + } + } + + return nil +} + +func (ss *SimpleStorage) DelUser(key string) error { + ss.aclLock.Lock() + _, ok := ss.acl[key] + ss.aclLock.Unlock() + + if !ok { + return fmt.Errorf("User (%s) Not found", key) + } + + ss.aclLock.Lock() + delete(ss.acl, key) + ss.aclLock.Unlock() + + return nil +} + +func (ss *SimpleStorage) Users() []string { + users := []string{} + + ss.aclLock.RLock() + for u, _ := range ss.acl { + users = append(users, u) + } + ss.aclLock.RUnlock() + + return users +} + func (ss *SimpleStorage) Del(key string) error { _, err := ss.Get(key) diff --git a/internal/storage/simple_test.go b/internal/storage/simple_test.go new file mode 100644 index 0000000..044431f --- /dev/null +++ b/internal/storage/simple_test.go @@ -0,0 +1,64 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSetUser(t *testing.T) { + s := NewSimpleStorage(make(chan StorageData)) + + err := s.SetUser("pathe", []AclRule{}) + require.NoError(t, err) + + t.Run("allow user to login with any password", func(t *testing.T) { + t.Parallel() + s := NewSimpleStorage(make(chan StorageData)) + + err := s.SetUser("pathe", []AclRule{ + AclRule{Type: AclAnyPassword}, + }) + require.NoError(t, err) + + u, err := s.GetUser("pathe") + require.NoError(t, err) + assert.Equal(t, true, u.AnyPassword) + }) + + t.Run("can assign password to user", func(t *testing.T) { + t.Parallel() + s := NewSimpleStorage(make(chan StorageData)) + + err := s.SetUser("pathe", []AclRule{ + AclRule{Type: AclSetUserPassword, Value: "mypwd"}, + }) + require.NoError(t, err) + + u, err := s.GetUser("pathe") + require.NoError(t, err) + assert.Equal(t, []string{"mypwd"}, u.Passwords) + }) + + t.Run("can activate a user", func(t *testing.T) { + t.Parallel() + s := NewSimpleStorage(make(chan StorageData)) + + err := s.SetUser("pathe", []AclRule{ + AclRule{Type: AclActivateUser}, + }) + require.NoError(t, err) + + u, err := s.GetUser("pathe") + require.NoError(t, err) + assert.Equal(t, true, u.Active) + + err = s.SetUser("pathe", []AclRule{ + AclRule{Type: AclDisableUser}, + }) + require.NoError(t, err) + + assert.Equal(t, false, u.Active) + }) +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 008b390..fa817ae 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -1,11 +1,16 @@ package storage type Storage interface { - Del(key string) error + // Users + GetUser(key string) (*User, error) + SetUser(key string, rule []AclRule) error + DelUser(key string) error + Users() []string // Simple strings Set(key string, value string, expires int64) error Get(key string) (string, error) + Del(key string) error // Maps HGet(key string) ([]byte, error) diff --git a/internal/storage/user.go b/internal/storage/user.go new file mode 100644 index 0000000..8f96a79 --- /dev/null +++ b/internal/storage/user.go @@ -0,0 +1,41 @@ +package storage + +import ( + "errors" +) + +type AclRuleType string + +var ( + AclActivateUser AclRuleType = "on" + AclDisableUser AclRuleType = "off" + AclResetUser AclRuleType = "reset" + AclSetUserPassword AclRuleType = "setuserpassword" + AclAnyPassword AclRuleType = "nopass" + AclClearSelectors AclRuleType = "clearselectors" + AclNoCommands AclRuleType = "nocommands" +) + +type AclRule struct { + Type AclRuleType + Value string +} + +type User struct { + Passwords []string + Active bool + AnyPassword bool +} + +func (u User) Authenticate(password string) error { + for _, p := range u.Passwords { + if p == password { + return nil + } + } + return errors.New("Password auth failed") +} + +func (u User) Authorize(command string) error { + return nil +} diff --git a/main.go b/main.go index 727f8e6..5f46ada 100644 --- a/main.go +++ b/main.go @@ -15,27 +15,49 @@ package main import ( + "context" "flag" + "log" "os" "pedis/internal/storage" "pedis/praft" "strings" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" + "github.com/redis/go-redis/v9" "go.etcd.io/etcd/raft/v3/raftpb" ) func main() { - log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) - cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") id := flag.Int("id", 1, "node ID") - kvport := flag.Int("port", 9121, "key-value server port") join := flag.Bool("join", false, "join an existing cluster") - pedis := flag.String("pedis", "localhost:6379", "port where pedis server is running") + pedis := flag.String("pedis", "0.0.0.0:6379", "port where pedis server is running") + healthcheck := flag.Bool("health", false, "allow docker and kubernetes to check the health") flag.Parse() + if *healthcheck { + client := redis.NewClient(&redis.Options{ + Addr: *pedis, + Password: "", + DB: 0, + }) + + log.Println("Running ping command") + pong, err := client.Ping(context.Background()).Result() + if err != nil { + log.Fatal(err) + os.Exit(1) + } + + if strings.ToLower(pong) != "pong" { + log.Fatalf("Received unexpected response (%v)", pong) + os.Exit(1) + } + + log.Println("Server healthy") + os.Exit(0) + } + proposeC := make(chan string) defer close(proposeC) @@ -45,13 +67,34 @@ func main() { storageProposeChan := make(chan storage.StorageData) defer close(storageProposeChan) - // raft provides a commit stream for the proposals from the http api var kvs *praft.PedisServer getSnapshot := func() ([]byte, error) { return kvs.GetSnapshot() } - commitC, errorC, snapshotterReady := praft.NewRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) + commitC, errorC, snapshotterReady := praft.NewRaftNode( + *id, + strings.Split(*cluster, ","), + *join, + getSnapshot, + proposeC, + confChangeC, + ) - kvs = praft.NewKVStore(<-snapshotterReady, proposeC, commitC, errorC, storage.NewSimpleStorage(storageProposeChan), *pedis, storageProposeChan) + kvs = praft.NewKVStore( + <-snapshotterReady, + proposeC, + commitC, + errorC, + storage.NewSimpleStorage(storageProposeChan), + *pedis, + storageProposeChan, + confChangeC, + ) - // the key-value http handler will propose updates to raft - praft.ServeHTTPKVAPI(kvs, *kvport, confChangeC, errorC) + go func() { + if err := kvs.StartPedis(); err != nil { + log.Fatal(err) + } + }() + if err, ok := <-errorC; ok { + log.Fatal(err) + } } diff --git a/praft/httpapi.go b/praft/httpapi.go deleted file mode 100644 index 2b133a3..0000000 --- a/praft/httpapi.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package praft - -import ( - "io" - "log" - "net/http" - "strconv" - - "go.etcd.io/etcd/raft/v3/raftpb" -) - -// Handler for a http based key-value store backed by raft -type httpKVAPI struct { - store *PedisServer - confChangeC chan<- raftpb.ConfChange -} - -func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { - key := r.RequestURI - defer r.Body.Close() - switch r.Method { - case http.MethodPut: - v, err := io.ReadAll(r.Body) - if err != nil { - log.Printf("Failed to read on PUT (%v)\n", err) - http.Error(w, "Failed on PUT", http.StatusBadRequest) - return - } - - log.Printf("command change (%v)\n", v) - // h.store.Propose(key, string(v)) - - // Optimistic-- no waiting for ack from raft. Value is not yet - // committed so a subsequent GET on the key may return old value - w.WriteHeader(http.StatusNoContent) - case http.MethodGet: - if v, ok := h.store.Lookup(key); ok { - w.Write([]byte(v)) - } else { - http.Error(w, "Failed to GET", http.StatusNotFound) - } - case http.MethodPost: - url, err := io.ReadAll(r.Body) - if err != nil { - log.Printf("Failed to read on POST (%v)\n", err) - http.Error(w, "Failed on POST", http.StatusBadRequest) - return - } - - nodeID, err := strconv.ParseUint(key[1:], 0, 64) - if err != nil { - log.Printf("Failed to convert ID for conf change (%v)\n", err) - http.Error(w, "Failed on POST", http.StatusBadRequest) - return - } - - cc := raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: nodeID, - Context: url, - } - h.confChangeC <- cc - // As above, optimistic that raft will apply the conf change - w.WriteHeader(http.StatusNoContent) - case http.MethodDelete: - nodeID, err := strconv.ParseUint(key[1:], 0, 64) - if err != nil { - log.Printf("Failed to convert ID for conf change (%v)\n", err) - http.Error(w, "Failed on DELETE", http.StatusBadRequest) - return - } - - cc := raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, - NodeID: nodeID, - } - h.confChangeC <- cc - - // As above, optimistic that raft will apply the conf change - w.WriteHeader(http.StatusNoContent) - default: - w.Header().Set("Allow", http.MethodPut) - w.Header().Add("Allow", http.MethodGet) - w.Header().Add("Allow", http.MethodPost) - w.Header().Add("Allow", http.MethodDelete) - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - } -} - -// serveHTTPKVAPI starts a key-value server with a GET/PUT API and listens. -func ServeHTTPKVAPI(kv *PedisServer, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) { - srv := http.Server{ - Addr: ":" + strconv.Itoa(port), - Handler: &httpKVAPI{ - store: kv, - confChangeC: confChangeC, - }, - } - go func() { - if err := srv.ListenAndServe(); err != nil { - log.Fatal(err) - } - }() - - go func() { - if err := kv.StartPedis(); err != nil { - log.Fatal(err) - } - }() - // exit when raft goes down - if err, ok := <-errorC; ok { - log.Fatal(err) - } -} diff --git a/praft/kvstore.go b/praft/kvstore.go index c4af972..b8fdb77 100644 --- a/praft/kvstore.go +++ b/praft/kvstore.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/gob" "encoding/json" + "errors" "log" "net" "os" @@ -38,10 +39,11 @@ type RedisCommand interface { // a key-value store backed by raft type PedisServer struct { - proposeC chan<- string // channel for proposing updates - mu sync.RWMutex - kvStore map[string]string // current committed key-value pairs - snapshotter *snap.Snapshotter + proposeC chan<- string // channel for proposing updates + clusterChangesChan chan<- raftpb.ConfChange + mu sync.RWMutex + kvStore map[string]string // current committed key-value pairs + snapshotter *snap.Snapshotter handlers map[string]RedisCommand store storage.Storage @@ -49,7 +51,8 @@ type PedisServer struct { storageProposeChan chan storage.StorageData - logger zerolog.Logger + logger zerolog.Logger + listener net.Listener } func NewPedisServer( @@ -78,6 +81,7 @@ func NewKVStore( store storage.Storage, pedisAddr string, storageProposeChan chan storage.StorageData, + clusterConfChan chan raftpb.ConfChange, ) *PedisServer { s := &PedisServer{ proposeC: proposeC, @@ -87,6 +91,7 @@ func NewKVStore( addr: pedisAddr, store: store, storageProposeChan: storageProposeChan, + clusterChangesChan: clusterConfChan, logger: zerolog.New( zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339}, ).With().Timestamp().Logger(), @@ -114,6 +119,7 @@ func (rs *PedisServer) AddHandler(firstByte string, c RedisCommand) error { return nil } + func (s *PedisServer) StartPedis() error { listener, err := net.Listen("tcp", s.addr) if err != nil { @@ -121,6 +127,7 @@ func (s *PedisServer) StartPedis() error { } defer listener.Close() + s.listener = listener for { s.logger.Debug().Msg("received new connection") @@ -153,11 +160,12 @@ func (rs *PedisServer) handleConnection(conn net.Conn) { } request := commands.ClientRequest{ - Conn: conn, - Data: bytes.Split(b[1:size], []byte{13, 10}), - Store: rs.store, - Logger: rs.logger, - DataRaw: commands.RawRequest(b[0:size]), + Conn: conn, + Data: bytes.Split(b[1:size], []byte{13, 10}), + Store: rs.store, + Logger: rs.logger, + DataRaw: commands.RawRequest(b[0:size]), + ClusterChangesChan: rs.clusterChangesChan, } handler.Run(request) @@ -227,6 +235,13 @@ func (s *PedisServer) GetSnapshot() ([]byte, error) { return json.Marshal(s.kvStore) } +func (s *PedisServer) PedisAddr() (*net.TCPAddr, error) { + if s.listener == nil { + return nil, errors.New("listener is not started or assigned") + } + return s.listener.Addr().(*net.TCPAddr), nil +} + func (s *PedisServer) loadSnapshot() (*raftpb.Snapshot, error) { snapshot, err := s.snapshotter.Load() if err == snap.ErrNoSnapshot { diff --git a/praft/kvstore_test.go b/praft/kvstore_test.go index dabc8eb..0daa81f 100644 --- a/praft/kvstore_test.go +++ b/praft/kvstore_test.go @@ -14,35 +14,9 @@ package praft -//func Test_kvstore_snapshot(t *testing.T) { -// tm := map[string]string{"foo": "bar"} -// s := &kvstore{kvStore: tm} -// -// v, _ := s.Lookup("foo") -// if v != "bar" { -// t.Fatalf("foo has unexpected value, got %s", v) -// } -// -// data, err := s.getSnapshot() -// if err != nil { -// t.Fatal(err) -// } -// s.kvStore = nil -// -// if err := s.recoverFromSnapshot(data); err != nil { -// t.Fatal(err) -// } -// v, _ = s.Lookup("foo") -// if v != "bar" { -// t.Fatalf("foo has unexpected value, got %s", v) -// } -// if !reflect.DeepEqual(s.kvStore, tm) { -// t.Fatalf("store expected %+v, got %+v", tm, s.kvStore) -// } -//} - import ( "context" + "fmt" "pedis/internal/storage" "testing" "time" @@ -52,22 +26,40 @@ import ( "github.com/stretchr/testify/require" ) -func TestServerSetAndGet(t *testing.T) { +func initClientAndServer(t *testing.T, port int) (*PedisServer, *redis.Client) { storageProposeChan := make(chan storage.StorageData) s := NewPedisServer( - "localhost:6379", + fmt.Sprintf("127.0.0.1:%d", port), storage.NewSimpleStorage(storageProposeChan), ) + client := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("127.0.0.1:%d", port), + Password: "", + DB: 0, + DisableIndentity: true, + }) + return s, client +} + +func TestCluster(t *testing.T) { + s, client := initClientAndServer(t, 9000) go s.StartPedis() ctx := context.Background() - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, + + t.Run("CLUSTER MEET", func(t *testing.T) { + _, err := client.Do(ctx, "cluster", "meet", "2", "http://127.0.0.1:22222").Result() + + require.NoError(t, err) }) +} + +func TestServerSetAndGet(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9001) + go s.StartPedis() t.Run("Can set a value with expiration date", func(t *testing.T) { err := client.Set(context.Background(), "key", "value", 2*time.Minute).Err() @@ -87,6 +79,12 @@ func TestServerSetAndGet(t *testing.T) { assert.Equal(t, int64(1), resultDel) }) +} + +func TestDEL(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9002) + go s.StartPedis() t.Run("DEL", func(t *testing.T) { t.Parallel() @@ -109,9 +107,163 @@ func TestServerSetAndGet(t *testing.T) { assert.Equal(t, int64(1), resultDel) }) }) +} + +func TestACLCat(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9003) + go s.StartPedis() + + t.Parallel() + + t.Run("CAT", func(t *testing.T) { + t.SkipNow() + _, err := client.Do(ctx, "acl", "cat").Result() + + require.NoError(t, err) + }) + + t.Run("CAT dangerous", func(t *testing.T) { + t.SkipNow() + _, err := client.Do(ctx, "acl", "cat", "dangerous").Result() + + require.NoError(t, err) + }) + + t.Run("CAT dangerous", func(t *testing.T) { + t.SkipNow() + _, err := client.Do(ctx, "acl", "cat", "dangerous").Result() + + require.NoError(t, err) + }) +} + +func TestACLAuth(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9004) + go s.StartPedis() + + t.Run("AUTH-1", func(t *testing.T) { + existingUser := "existingUser" + user404 := "user:404" + + _, err := client.Do(ctx, "acl", "setuser", existingUser, "on", ">weak-password:").Result() + require.NoError(t, err) + + _, err = client.Do(ctx, "auth", existingUser).Result() + require.Error(t, err) + + _, err = client.Do(ctx, "auth", existingUser, "weak-password").Result() + require.NoError(t, err) + + _, err = client.Do(ctx, "auth", user404, "weak-password").Result() + require.Error(t, err) + }) +} + +func TestACLSetUser(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9005) + go s.StartPedis() + + t.Run("SETUSER-1", func(t *testing.T) { + _, err := client.Do(ctx, "acl", "setuser", "pathe-s").Result() + require.NoError(t, err) + + _, err = client.Do(ctx, "acl", "deluser", "pathe-s", "mado-1").Result() + require.NoError(t, err) + }) + + t.Run("SETUSER-2", func(t *testing.T) { + _, err := client.Do(ctx, "acl", "setuser", "pathe-s", "on").Result() + require.NoError(t, err) + + _, err = client.Do(ctx, "acl", "deluser", "pathe-s", "mado-1").Result() + require.NoError(t, err) + }) +} + +func TestACLGetUser(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9006) + go s.StartPedis() + + t.Run("GETUSER", func(t *testing.T) { + t.SkipNow() + _, err := client.Do(ctx, "acl", "getuser", "pathe").Result() + + require.NoError(t, err) + }) +} + +func TestACLUsers(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9007) + go s.StartPedis() + + t.Run("USERS-1", func(t *testing.T) { + list, err := client.Do(ctx, "acl", "users").Result() + + require.NoError(t, err) + assert.Equal(t, []interface{}{}, list) + }) + + t.Run("USERS-2", func(t *testing.T) { + _, _ = client.Do(ctx, "acl", "setuser", "acl-user-1").Result() + _, _ = client.Do(ctx, "acl", "setuser", "acl-user-2").Result() + + list, err := client.Do(ctx, "acl", "users").Result() + + require.NoError(t, err) + assert.Equal(t, []interface{}([]interface{}{"acl-user-1", "acl-user-2"}), list) + + }) +} + +func TestACLDelUser(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9008) + go s.StartPedis() + + t.Run("DELUSER-1", func(t *testing.T) { + _, _ = client.Do(ctx, "acl", "setuser", "pathe-1").Result() + _, _ = client.Do(ctx, "acl", "setuser", "mado-1").Result() + count, err := client.Do(ctx, "acl", "deluser", "pathe-1", "mado-1").Result() + + require.NoError(t, err) + assert.Equal(t, int64(2), count) + }) + + t.Run("DELUSER-2", func(t *testing.T) { + _, _ = client.Do(ctx, "acl", "setuser", "pathe-2").Result() + count, err := client.Do(ctx, "acl", "deluser", "pathe-2", "mado").Result() + + require.NoError(t, err) + assert.Equal(t, int64(1), count) + }) + + t.Run("DELUSER-3", func(t *testing.T) { + count, err := client.Do(ctx, "acl", "deluser", "pathe-3", "mado").Result() + + require.NoError(t, err) + assert.Equal(t, int64(0), count) + }) + + t.Run("DRYRUN", func(t *testing.T) { + t.SkipNow() + _, err := client.Do(ctx, "acl", "dryrun", "pathe", "get", "foo").Result() + + require.NoError(t, err) + }) +} + +func TestGetSet(t *testing.T) { + ctx := context.Background() + s, client := initClientAndServer(t, 9009) + go s.StartPedis() t.Run("Cannot set a key with empty value", func(t *testing.T) { - err := client.Set(context.Background(), "key", "", 0).Err() + err := client.Set(ctx, "key", "", 0).Err() assert.Equal(t, err.Error(), "ERR value is empty") }) @@ -128,20 +280,9 @@ func TestServerSetAndGet(t *testing.T) { } func TestServerHSetAndHGet(t *testing.T) { - storageProposeChan := make(chan storage.StorageData) - s := NewPedisServer( - "localhost:6379", - storage.NewSimpleStorage(storageProposeChan), - ) - + s, client := initClientAndServer(t, 9010) go s.StartPedis() - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }) - t.Run("Can set and get a hash", func(t *testing.T) { // m := map[string]interface{}{"key-one": "one value", "key-two": "two value"} // err = client.HMSet(context.Background(), "myhash", m, 0).Err() diff --git a/praft/raft.go b/praft/raft.go index 272423e..538eae4 100644 --- a/praft/raft.go +++ b/praft/raft.go @@ -323,9 +323,11 @@ func (rc *raftNode) startRaft() { } rc.transport.Start() - for i := range rc.peers { - if i+1 != rc.id { - rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) + if len(rc.peers) > 1 { + for i := range rc.peers { + if i+1 != rc.id { + rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) + } } } @@ -510,6 +512,7 @@ func (rc *raftNode) serveRaft() { log.Fatalf("raftexample: Failed parsing URL (%v)", err) } + rc.logger.Info("started serving raft requests on ", zap.String("url", url.String())) ln, err := newStoppableListener(url.Host, rc.httpstopc) if err != nil { log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)