Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pull request #9 from PapePathe/hash-commands #10

Merged
merged 21 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .ko.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defaultPlatforms:
- linux/arm64
- linux/amd64
- linux/arm64/v8
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ COPY --from=build-stage /pedis/pedis /pedis

EXPOSE 6379
EXPOSE 12380
EXPOSE 1237
EXPOSE 12379

# USER nonroot:nonroot

ENTRYPOINT /pedis -id $ID -pedis $PEDIS -cluster $CLUSTER -port $PORT
ENTRYPOINT /pedis -id $ID -pedis $PEDIS -cluster $CLUSTER -port $PORT -join $JOIN
17 changes: 17 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
clean:
rm -rf /tmp/pedis*

build-image:
ko build

start-primary:
go run main.go --cluster http://127.0.0.1:12379

start-secondary:
go run main.go --id 2 --join --pedis 127.0.0.1:6389 --cluster http://127.0.0.1:12379,http://127.0.0.1:12380

start-tertiary:
go run main.go --id 3 --pedis 127.0.0.1:6390 --cluster http://127.0.0.1:12379,http://127.0.0.1:12380,http://127.0.0.1:12381

test:
go test -v ./... -race
22 changes: 21 additions & 1 deletion cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package main

import (
"context"
"flag"
"fmt"
"log"

"github.com/redis/go-redis/v9"
)

var reqCount int

func main() {
flag.IntVar(&reqCount, "req", 100, "")
flag.Parse()

client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})

for i := 0; i < 2; i++ {
for i := 0; i < reqCount; i++ {
key := fmt.Sprintf("key-%d", i)
err := client.Set(context.Background(), key, "value", 0).Err()
if err != nil {
Expand All @@ -29,5 +35,19 @@ func main() {
}

log.Println("Get result", result)

key = fmt.Sprintf("hset:key-%d", i)
err = client.HSet(context.Background(), key, "name", "pathe").Err()
if err != nil {
log.Println(err)
}

resultHset, err := client.HGet(context.Background(), key, "name").Result()

if err != nil {
log.Println(err)
}

log.Println("Hset result", resultHset)
}
}
49 changes: 18 additions & 31 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
version: "3.4"
x-app: &default-pedis
image: ko.local/ppathe/pedis/pedis-9e6a4042beefc3cd7105eec240e5b3ab:c3cb180cc2aba05bc6202089b4d76c334f18c7a2a5fdf9e2f53691d1bfcce727
healthcheck:
test: /ko-app/pedis --health
interval: 1s
retries: 5
start_period: 1s
timeout: 10s
services:
## memtier:
## image: redis
## command: redis-benchmark -h pedis-a
## depends_on:
## - pedis-a
##
memtier:
image: redislabs/memtier_benchmark
command: memtier_benchmark -s pedis-a -p 6379 -R -n allkeys -d 50 -n 10 --command="hset __key__ name Pathe country Senegal" --command-ratio=2 --command-key-pattern=G
pedis-a:
build: .
environment:
ID: 1
PEDIS: :6379
PORT: 12380
CLUSTER: http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379
<<: *default-pedis
container_name: pedis-a
entrypoint: /ko-app/pedis --cluster http://pedis-a:12379
ports:
- "6379:6379"
pedis-b:
environment:
ID: 2
PEDIS: :6379
PORT: 12380
CLUSTER: http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379
build: .
<<: *default-pedis
container_name: pedis-b
entrypoint: /ko-app/pedis --id 2 --join --cluster http://pedis-a:12379,http://pedis-b:12379
depends_on:
- pedis-a
ports:
- "6389:6389"
pedis-c:
environment:
ID: 3
PEDIS: :6379
PORT: 12380
CLUSTER: http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379
build: .
<<: *default-pedis
container_name: pedis-c
entrypoint: /ko-app/pedis --id 3 --join --cluster http://pedis-a:12379,http://pedis-b:12379,http://pedis-c:12379
depends_on:
- pedis-a
ports:
- "6399:6399"
28 changes: 18 additions & 10 deletions internal/commands/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package commands
import (
"bytes"
"fmt"
"log"
"net"
"pedis/internal/renderer"
"pedis/internal/storage"
"strings"

"github.com/rs/zerolog"
"go.etcd.io/etcd/raft/v3/raftpb"
)

type IClientRequest interface {
Expand Down Expand Up @@ -47,8 +47,6 @@ func (r RawRequest) ReadArray() []string {
sl := SliceAsChunks(items[3:], 2)
array := []string{}

log.Println(sl)

for _, i := range sl {
if len(i) == 2 {
array = append(array, string(i[1]))
Expand All @@ -59,11 +57,12 @@ func (r RawRequest) ReadArray() []string {
}

type ClientRequest struct {
Conn net.Conn
Data [][]byte
DataRaw RawRequest
Store storage.Storage
Logger zerolog.Logger
Conn net.Conn
Data [][]byte
DataRaw RawRequest
Store storage.Storage
Logger zerolog.Logger
ClusterChangesChan chan<- raftpb.ConfChange
}

func (c ClientRequest) WriteError(s string) error {
Expand Down Expand Up @@ -106,6 +105,10 @@ func (c ClientRequest) WriteNumber(s string) error {
}

func (c ClientRequest) WriteOK() error {
_, err := c.Conn.Write([]byte("+OK\r\n"))
if err != nil {
return fmt.Errorf("net write error (%v)", err)
}
return nil
}

Expand All @@ -117,6 +120,11 @@ func (c ClientRequest) WriteNil() error {
return nil
}

func (c ClientRequest) Write([]byte) (int, error) {
return 0, nil
func (c ClientRequest) Write(data []byte) (int, error) {
n, err := c.Conn.Write(data)
if err != nil {
return 0, fmt.Errorf("net write error (%v)", err)
}

return n, nil
}
45 changes: 28 additions & 17 deletions internal/commands/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,28 @@ func NewRequestHandler() *RequestHandler {

func DefaultRequestHandler() *RequestHandler {
subcommands := map[string]CommandHandler{
"del": DelHandler,
// cluster commands
"cluster": ClusterHandler{},

// acl commands
"acl": AclHandler{},

"del": DelHandler{},
"get": GetHandler{},
"set": SetHandler{},

// hash related commands
"hexists": HExistsHandler,
"hget": HGetHandler,
"hkeys": HKeysHandler,
"hlen": HLenHandler,
"hset": HSetHandler,
"hvals": HValsHandler,
"config": ConfigHandler,
"hexists": HExistsHandler{},
"hget": HGetHandler{},
"hkeys": HKeysHandler{},
"hlen": HLenHandler{},
"hset": HSetHandler{},
"hvals": HValsHandler{},

"config": ConfigHandler{},
"hello": HelloHandler{},
"auth": AuthHandler{},
"ping": PingHandler{},
}
return &RequestHandler{subcommands}
}
Expand All @@ -32,21 +45,19 @@ func (s RequestHandler) Run(request ClientRequest) {
subcommand := strings.ToLower(string(request.Data[2]))

if h, ok := s.subcommands[subcommand]; ok {
go h(request)
if err := h.Authorize(request); err != nil {
request.WriteError("not authorized to access command")
return
}
go h.Handle(request)
} else {
switch subcommand {
case "hello":
go HelloHandler(request.Data, request.Store, request.Conn)
case "get":
go GetHandler(request.Data, request.Store, request.Conn)
case "set":
go SetHandler(request.Data, request.Store, request.Conn)
case "client":
request.WriteString("OK")
request.Logger.Debug().Msg("going to execute client options command")
default:
request.WriteError(fmt.Sprintf("command not supported %v", string(request.Data[2])))
request.Logger.Debug().Str("command", string(request.Data[2])).Msg("is not yet supported")
request.WriteError(fmt.Sprintf("command not supported %v", subcommand))
request.Logger.Debug().Str("command", subcommand).Msg("is not yet supported")
}
}
}
11 changes: 10 additions & 1 deletion internal/commands/handler.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
package commands

type CommandHandler func(ClientRequest)
type CommandHandler interface {
// Runs the request and write response to client
Handle(ClientRequest)
// Checks that calling user has the permissions required by the command
Authorize(ClientRequest) error
// Returns the list of permissions required to run the command
Permissions() []string
// Returns true if the command is going to persist data
Persistent() bool
}
105 changes: 105 additions & 0 deletions internal/commands/handler_acl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package commands

import (
"fmt"
"pedis/internal/storage"
)

type AclHandler struct{}

func (ch AclHandler) Authorize(ClientRequest) error {
return nil
}

func (ch AclHandler) Permissions() []string {
return nil
}

func (ch AclHandler) Persistent() bool {
return false
}

func (ch AclHandler) Handle(r ClientRequest) {
data := r.DataRaw.ReadArray()
r.Logger.
Debug().
Interface("Data", data).
Interface("RawData", r.DataRaw.String()).
Msg("")

svc := aclService{}

switch data[0] {
case "setuser":
_ = svc.setuser(r)
case "deluser":
_ = svc.deluser(r)
case "users":
_ = svc.users(r)
default:
r.WriteError(fmt.Sprintf("(%s) not implemented by devin", data[0]))
}
}

type aclService struct{}

func (aclService) deluser(r ClientRequest) error {
data := r.DataRaw.ReadArray()
r.Logger.Debug().Interface("usernames", data[1:]).Msg("Going to delete")
delCount := 0

for _, u := range data[1:] {
err := r.Store.DelUser(u)

if err == nil {
delCount++
}
}

r.WriteNumber(fmt.Sprintf("%d", delCount))

return nil
}

func (aclService) setuser(r ClientRequest) error {
data := r.DataRaw.ReadArray()
r.Logger.Debug().Msg("Going to create or update existing user")
username := data[1]
rules := []storage.AclRule{}

if len(data) >= 2 {
for _, elem := range data[2:] {
switch elem {
case "on":
rules = append(rules, storage.AclRule{Type: storage.AclActivateUser})
case "off":
rules = append(rules, storage.AclRule{Type: storage.AclDisableUser})
case "nopass":
rules = append(rules, storage.AclRule{Type: storage.AclDisableUser})
case "reset":
rules = append(rules, storage.AclRule{Type: storage.AclResetUser})
default:
switch elem[0] {
case '>':
rules = append(rules, storage.AclRule{Type: storage.AclSetUserPassword, Value: elem[1 : len(elem)-1]})
default:
r.WriteError(fmt.Sprintf("acl rule (%s) not supported", elem))
}
}
}
}

_ = r.Store.SetUser(username, rules)
r.WriteOK()
return nil
}

func (aclService) users(r ClientRequest) error {
r.Logger.Debug().Msg("Going to list users")

users := r.Store.Users()
r.Logger.Debug().Interface("Users list", users).Msg("")
r.WriteArray(users)

return nil
}
Loading
Loading